"""Unified Features Generation - Checkpoint-Based Workflow Combines JAO (1,737) + ENTSO-E (297) + Weather (376) features = 2,410 total features Executes step-by-step with checkpoints for fast debugging. Author: Claude Date: 2025-11-11 """ import sys from pathlib import Path import polars as pl from datetime import datetime # Paths BASE_DIR = Path(__file__).parent.parent RAW_DIR = BASE_DIR / 'data' / 'raw' PROCESSED_DIR = BASE_DIR / 'data' / 'processed' # Input files JAO_FILE = PROCESSED_DIR / 'features_jao_24month.parquet' ENTSOE_FILE = PROCESSED_DIR / 'features_entsoe_24month.parquet' WEATHER_FILE = PROCESSED_DIR / 'features_weather_24month.parquet' # Output files UNIFIED_FILE = PROCESSED_DIR / 'features_unified_24month.parquet' METADATA_FILE = PROCESSED_DIR / 'features_unified_metadata.csv' print("="*80) print("UNIFIED FEATURES GENERATION - CHECKPOINT WORKFLOW") print("="*80) print() # ============================================================================ # CHECKPOINT 1: Load Input Files # ============================================================================ print("[CHECKPOINT 1] Loading input files...") print() try: jao_raw = pl.read_parquet(JAO_FILE) print(f"[OK] JAO features loaded: {jao_raw.shape[0]:,} rows x {jao_raw.shape[1]} cols") entsoe_raw = pl.read_parquet(ENTSOE_FILE) print(f"[OK] ENTSO-E features loaded: {entsoe_raw.shape[0]:,} rows x {entsoe_raw.shape[1]} cols") weather_raw = pl.read_parquet(WEATHER_FILE) print(f"[OK] Weather features loaded: {weather_raw.shape[0]:,} rows x {weather_raw.shape[1]} cols") print() except Exception as e: print(f"[ERROR] Failed to load input files: {e}") sys.exit(1) # ============================================================================ # CHECKPOINT 2: Standardize Timestamps # ============================================================================ print("[CHECKPOINT 2] Standardizing timestamps...") print() try: # JAO: Convert mtu to UTC timestamp (remove timezone, use microseconds) jao_std = jao_raw.with_columns([ pl.col('mtu').dt.convert_time_zone('UTC').dt.replace_time_zone(None).dt.cast_time_unit('us').alias('timestamp') ]).drop('mtu') print(f"[OK] JAO timestamps standardized") # ENTSO-E: Remove timezone, ensure microsecond precision entsoe_std = entsoe_raw.with_columns([ pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('us') ]) print(f"[OK] ENTSO-E timestamps standardized") # Weather: Remove timezone, ensure microsecond precision weather_std = weather_raw.with_columns([ pl.col('timestamp').dt.replace_time_zone(None).dt.cast_time_unit('us') ]) print(f"[OK] Weather timestamps standardized") print() except Exception as e: print(f"[ERROR] Timestamp standardization failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # CHECKPOINT 3: Find Common Date Range # ============================================================================ print("[CHECKPOINT 3] Finding common date range...") print() try: jao_min, jao_max = jao_std['timestamp'].min(), jao_std['timestamp'].max() entsoe_min, entsoe_max = entsoe_std['timestamp'].min(), entsoe_std['timestamp'].max() weather_min, weather_max = weather_std['timestamp'].min(), weather_std['timestamp'].max() print(f"JAO range: {jao_min} to {jao_max}") print(f"ENTSO-E range: {entsoe_min} to {entsoe_max}") print(f"Weather range: {weather_min} to {weather_max}") print() common_min = max(jao_min, entsoe_min, weather_min) common_max = min(jao_max, entsoe_max, weather_max) print(f"[OK] Common range: {common_min} to {common_max}") print() except Exception as e: print(f"[ERROR] Date range calculation failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # CHECKPOINT 4: Filter to Common Range # ============================================================================ print("[CHECKPOINT 4] Filtering to common date range...") print() try: jao_filtered = jao_std.filter( (pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max) ).sort('timestamp') print(f"[OK] JAO filtered: {jao_filtered.shape[0]:,} rows") entsoe_filtered = entsoe_std.filter( (pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max) ).sort('timestamp') print(f"[OK] ENTSO-E filtered: {entsoe_filtered.shape[0]:,} rows") weather_filtered = weather_std.filter( (pl.col('timestamp') >= common_min) & (pl.col('timestamp') <= common_max) ).sort('timestamp') print(f"[OK] Weather filtered: {weather_filtered.shape[0]:,} rows") print() except Exception as e: print(f"[ERROR] Filtering failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # CHECKPOINT 5: Merge Datasets # ============================================================================ print("[CHECKPOINT 5] Merging datasets horizontally...") print() try: # Start with JAO (has timestamp) unified_df = jao_filtered # Join ENTSO-E on timestamp entsoe_to_join = entsoe_filtered.drop('timestamp') # Drop duplicate timestamp column unified_df = unified_df.hstack(entsoe_to_join) print(f"[OK] ENTSO-E merged: {unified_df.shape[1]} total columns") # Join Weather on timestamp weather_to_join = weather_filtered.drop('timestamp') # Drop duplicate timestamp column unified_df = unified_df.hstack(weather_to_join) print(f"[OK] Weather merged: {unified_df.shape[1]} total columns") print() print(f"Final unified shape: {unified_df.shape[0]:,} rows x {unified_df.shape[1]} columns") print() except Exception as e: print(f"[ERROR] Merge failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # CHECKPOINT 6: Data Quality Check # ============================================================================ print("[CHECKPOINT 6] Running data quality checks...") print() try: # Check for nulls null_counts = unified_df.null_count() total_nulls = null_counts.sum_horizontal()[0] total_cells = unified_df.shape[0] * unified_df.shape[1] completeness = (1 - total_nulls / total_cells) * 100 print(f"Data completeness: {completeness:.2f}%") print(f"Total null values: {total_nulls:,} / {total_cells:,}") print() # Check timestamp continuity timestamps = unified_df['timestamp'].sort() time_diffs = timestamps.diff().dt.total_hours() gaps = time_diffs.filter((time_diffs.is_not_null()) & (time_diffs != 1)) print(f"Timestamp continuity check:") print(f" - Total timestamps: {len(timestamps):,}") print(f" - Gaps detected: {len(gaps)}") print(f" - Continuous: {'YES' if len(gaps) == 0 else 'NO'}") print() except Exception as e: print(f"[ERROR] Quality check failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # CHECKPOINT 7: Save Unified Features # ============================================================================ print("[CHECKPOINT 7] Saving unified features file...") print() try: PROCESSED_DIR.mkdir(parents=True, exist_ok=True) unified_df.write_parquet(UNIFIED_FILE) file_size_mb = UNIFIED_FILE.stat().st_size / (1024 * 1024) print(f"[OK] Saved to: {UNIFIED_FILE}") print(f"[OK] File size: {file_size_mb:.1f} MB") print() except Exception as e: print(f"[ERROR] Save failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # CHECKPOINT 8: Generate Feature Metadata # ============================================================================ print("[CHECKPOINT 8] Generating feature metadata...") print() try: # Create metadata catalog feature_cols = [c for c in unified_df.columns if c != 'timestamp'] metadata_rows = [] for i, col in enumerate(feature_cols, 1): # Determine category from column name if col.startswith('border_'): category = 'JAO_Border' elif col.startswith('cnec_'): category = 'JAO_CNEC' elif '_lta_' in col: category = 'LTA' elif '_load_forecast_' in col: category = 'Load_Forecast' elif '_gen_outage_' in col or '_tx_outage_' in col: category = 'Outages' elif any(col.startswith(prefix) for prefix in ['AT_', 'BE_', 'CZ_', 'DE_', 'FR_', 'HR_', 'HU_', 'NL_', 'PL_', 'RO_', 'SI_', 'SK_']): category = 'Weather' else: category = 'Other' metadata_rows.append({ 'feature_index': i, 'feature_name': col, 'category': category, 'null_count': unified_df[col].null_count(), 'dtype': str(unified_df[col].dtype) }) metadata_df = pl.DataFrame(metadata_rows) metadata_df.write_csv(METADATA_FILE) print(f"[OK] Saved metadata: {METADATA_FILE}") print(f"[OK] Total features: {len(feature_cols)}") print() # Category breakdown category_counts = metadata_df.group_by('category').agg(pl.count().alias('count')).sort('count', descending=True) print("Feature breakdown by category:") for row in category_counts.iter_rows(named=True): print(f" - {row['category']}: {row['count']}") print() except Exception as e: print(f"[ERROR] Metadata generation failed: {e}") import traceback traceback.print_exc() sys.exit(1) # ============================================================================ # FINAL SUMMARY # ============================================================================ print("="*80) print("UNIFIED FEATURES GENERATION COMPLETE") print("="*80) print() print(f"Output file: {UNIFIED_FILE}") print(f"Shape: {unified_df.shape[0]:,} rows x {unified_df.shape[1]} columns") print(f"Date range: {unified_df['timestamp'].min()} to {unified_df['timestamp'].max()}") print(f"Data completeness: {completeness:.2f}%") print(f"File size: {file_size_mb:.1f} MB") print() print("[SUCCESS] All checkpoints passed!") print()