Spaces:
Sleeping
Sleeping
| """Unified Features Generation - Checkpoint-Based Workflow | |
| Combines JAO (1,737) + ENTSO-E (297) + Weather (376) features = 2,410 total features | |
| Executes step-by-step with checkpoints for fast debugging. | |
| Author: Claude | |
| Date: 2025-11-11 | |
| """ | |
| import sys | |
| from pathlib import Path | |
| import polars as pl | |
| from datetime import datetime | |
| # Paths | |
| BASE_DIR = Path(__file__).parent.parent | |
| RAW_DIR = BASE_DIR / 'data' / 'raw' | |
| PROCESSED_DIR = BASE_DIR / 'data' / 'processed' | |
| # Input files | |
| JAO_FILE = PROCESSED_DIR / 'features_jao_24month.parquet' | |
| ENTSOE_FILE = PROCESSED_DIR / 'features_entsoe_24month.parquet' | |
| WEATHER_FILE = PROCESSED_DIR / 'features_weather_24month.parquet' | |
| # Output files | |
| UNIFIED_FILE = PROCESSED_DIR / 'features_unified_24month.parquet' | |
| METADATA_FILE = PROCESSED_DIR / 'features_unified_metadata.csv' | |
| print("="*80) | |
| print("UNIFIED FEATURES GENERATION - CHECKPOINT WORKFLOW") | |
| print("="*80) | |
| print() | |
| # ============================================================================ | |
| # CHECKPOINT 1: Load Input Files | |
| # ============================================================================ | |
| print("[CHECKPOINT 1] Loading input files...") | |
| print() | |
| try: | |
| jao_raw = pl.read_parquet(JAO_FILE) | |
| print(f"[OK] JAO features loaded: {jao_raw.shape[0]:,} rows x {jao_raw.shape[1]} cols") | |
| entsoe_raw = pl.read_parquet(ENTSOE_FILE) | |
| print(f"[OK] ENTSO-E features loaded: {entsoe_raw.shape[0]:,} rows x {entsoe_raw.shape[1]} cols") | |
| weather_raw = pl.read_parquet(WEATHER_FILE) | |
| print(f"[OK] Weather features loaded: {weather_raw.shape[0]:,} rows x {weather_raw.shape[1]} cols") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Failed to load input files: {e}") | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 2: Standardize Timestamps | |
| # ============================================================================ | |
| print("[CHECKPOINT 2] Standardizing timestamps...") | |
| print() | |
| try: | |
| # JAO: Convert mtu to UTC timestamp (remove timezone, use microseconds) | |
| jao_std = jao_raw.with_columns([ | |
| pl.col('mtu').dt.convert_time_zone('UTC').dt.replace_time_zone(None).dt.cast_time_unit('us').alias('timestamp') | |
| ]).drop('mtu') | |
| print(f"[OK] JAO timestamps standardized") | |
| # ENTSO-E: Remove timezone, ensure microsecond precision | |
| entsoe_std = entsoe_raw.with_columns([ | |
| pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('us') | |
| ]) | |
| print(f"[OK] ENTSO-E timestamps standardized") | |
| # Weather: Remove timezone, ensure microsecond precision | |
| weather_std = weather_raw.with_columns([ | |
| pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('us') | |
| ]) | |
| print(f"[OK] Weather timestamps standardized") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Timestamp standardization failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 3: Find Common Date Range | |
| # ============================================================================ | |
| print("[CHECKPOINT 3] Finding common date range...") | |
| print() | |
| try: | |
| jao_min, jao_max = jao_std['timestamp'].min(), jao_std['timestamp'].max() | |
| entsoe_min, entsoe_max = entsoe_std['timestamp'].min(), entsoe_std['timestamp'].max() | |
| weather_min, weather_max = weather_std['timestamp'].min(), weather_std['timestamp'].max() | |
| print(f"JAO range: {jao_min} to {jao_max}") | |
| print(f"ENTSO-E range: {entsoe_min} to {entsoe_max}") | |
| print(f"Weather range: {weather_min} to {weather_max}") | |
| print() | |
| common_min = max(jao_min, entsoe_min, weather_min) | |
| common_max = min(jao_max, entsoe_max, weather_max) | |
| print(f"[OK] Common range: {common_min} to {common_max}") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Date range calculation failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 4: Filter to Common Range | |
| # ============================================================================ | |
| print("[CHECKPOINT 4] Filtering to common date range...") | |
| print() | |
| try: | |
| jao_filtered = jao_std.filter( | |
| (pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max) | |
| ).sort('timestamp') | |
| print(f"[OK] JAO filtered: {jao_filtered.shape[0]:,} rows") | |
| entsoe_filtered = entsoe_std.filter( | |
| (pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max) | |
| ).sort('timestamp') | |
| print(f"[OK] ENTSO-E filtered: {entsoe_filtered.shape[0]:,} rows") | |
| weather_filtered = weather_std.filter( | |
| (pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max) | |
| ).sort('timestamp') | |
| print(f"[OK] Weather filtered: {weather_filtered.shape[0]:,} rows") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Filtering failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 5: Merge Datasets | |
| # ============================================================================ | |
| print("[CHECKPOINT 5] Merging datasets horizontally...") | |
| print() | |
| try: | |
| # Start with JAO (has timestamp) | |
| unified_df = jao_filtered | |
| # Join ENTSO-E on timestamp | |
| entsoe_to_join = entsoe_filtered.drop('timestamp') # Drop duplicate timestamp column | |
| unified_df = unified_df.hstack(entsoe_to_join) | |
| print(f"[OK] ENTSO-E merged: {unified_df.shape[1]} total columns") | |
| # Join Weather on timestamp | |
| weather_to_join = weather_filtered.drop('timestamp') # Drop duplicate timestamp column | |
| unified_df = unified_df.hstack(weather_to_join) | |
| print(f"[OK] Weather merged: {unified_df.shape[1]} total columns") | |
| print() | |
| print(f"Final unified shape: {unified_df.shape[0]:,} rows x {unified_df.shape[1]} columns") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Merge failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 6: Data Quality Check | |
| # ============================================================================ | |
| print("[CHECKPOINT 6] Running data quality checks...") | |
| print() | |
| try: | |
| # Check for nulls | |
| null_counts = unified_df.null_count() | |
| total_nulls = null_counts.sum_horizontal()[0] | |
| total_cells = unified_df.shape[0] * unified_df.shape[1] | |
| completeness = (1 - total_nulls / total_cells) * 100 | |
| print(f"Data completeness: {completeness:.2f}%") | |
| print(f"Total null values: {total_nulls:,} / {total_cells:,}") | |
| print() | |
| # Check timestamp continuity | |
| timestamps = unified_df['timestamp'].sort() | |
| time_diffs = timestamps.diff().dt.total_hours() | |
| gaps = time_diffs.filter((time_diffs.is_not_null()) & (time_diffs != 1)) | |
| print(f"Timestamp continuity check:") | |
| print(f" - Total timestamps: {len(timestamps):,}") | |
| print(f" - Gaps detected: {len(gaps)}") | |
| print(f" - Continuous: {'YES' if len(gaps) == 0 else 'NO'}") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Quality check failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 7: Save Unified Features | |
| # ============================================================================ | |
| print("[CHECKPOINT 7] Saving unified features file...") | |
| print() | |
| try: | |
| PROCESSED_DIR.mkdir(parents=True, exist_ok=True) | |
| unified_df.write_parquet(UNIFIED_FILE) | |
| file_size_mb = UNIFIED_FILE.stat().st_size / (1024 * 1024) | |
| print(f"[OK] Saved to: {UNIFIED_FILE}") | |
| print(f"[OK] File size: {file_size_mb:.1f} MB") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Save failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # CHECKPOINT 8: Generate Feature Metadata | |
| # ============================================================================ | |
| print("[CHECKPOINT 8] Generating feature metadata...") | |
| print() | |
| try: | |
| # Create metadata catalog | |
| feature_cols = [c for c in unified_df.columns if c != 'timestamp'] | |
| metadata_rows = [] | |
| for i, col in enumerate(feature_cols, 1): | |
| # Determine category from column name | |
| if col.startswith('border_'): | |
| category = 'JAO_Border' | |
| elif col.startswith('cnec_'): | |
| category = 'JAO_CNEC' | |
| elif '_lta_' in col: | |
| category = 'LTA' | |
| elif '_load_forecast_' in col: | |
| category = 'Load_Forecast' | |
| elif '_gen_outage_' in col or '_tx_outage_' in col: | |
| category = 'Outages' | |
| elif any(col.startswith(prefix) for prefix in ['AT_', 'BE_', 'CZ_', 'DE_', 'FR_', 'HR_', 'HU_', 'NL_', 'PL_', 'RO_', 'SI_', 'SK_']): | |
| category = 'Weather' | |
| else: | |
| category = 'Other' | |
| metadata_rows.append({ | |
| 'feature_index': i, | |
| 'feature_name': col, | |
| 'category': category, | |
| 'null_count': unified_df[col].null_count(), | |
| 'dtype': str(unified_df[col].dtype) | |
| }) | |
| metadata_df = pl.DataFrame(metadata_rows) | |
| metadata_df.write_csv(METADATA_FILE) | |
| print(f"[OK] Saved metadata: {METADATA_FILE}") | |
| print(f"[OK] Total features: {len(feature_cols)}") | |
| print() | |
| # Category breakdown | |
| category_counts = metadata_df.group_by('category').agg(pl.count().alias('count')).sort('count', descending=True) | |
| print("Feature breakdown by category:") | |
| for row in category_counts.iter_rows(named=True): | |
| print(f" - {row['category']}: {row['count']}") | |
| print() | |
| except Exception as e: | |
| print(f"[ERROR] Metadata generation failed: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| sys.exit(1) | |
| # ============================================================================ | |
| # FINAL SUMMARY | |
| # ============================================================================ | |
| print("="*80) | |
| print("UNIFIED FEATURES GENERATION COMPLETE") | |
| print("="*80) | |
| print() | |
| print(f"Output file: {UNIFIED_FILE}") | |
| print(f"Shape: {unified_df.shape[0]:,} rows x {unified_df.shape[1]} columns") | |
| print(f"Date range: {unified_df['timestamp'].min()} to {unified_df['timestamp'].max()}") | |
| print(f"Data completeness: {completeness:.2f}%") | |
| print(f"File size: {file_size_mb:.1f} MB") | |
| print() | |
| print("[SUCCESS] All checkpoints passed!") | |
| print() | |