"""Engineer ~486 ENTSO-E features for FBMC forecasting. Transforms 8 ENTSO-E datasets into model-ready features: 1. Generation by PSR type (~228 features): - Individual PSR types (8 types × 12 zones × 2 = 192 features with lags) - Aggregates (total + shares = 36 features) 2. Demand/Load (24 features) 3. Day-ahead prices (24 features) 4. Hydro reservoir storage (12 features) 5. Pumped storage (10 features) 6. Load forecasts (12 features) 7. Transmission outages (176 features - ALL CNECs with EIC mapping) Total: ~486 features (generation outages not available in raw data) Author: Claude Date: 2025-11-10 """ from pathlib import Path from typing import Tuple, List import polars as pl import numpy as np # ========================================================================= # Feature Category 1: Generation by PSR Type # ========================================================================= def engineer_generation_features(generation_df: pl.DataFrame) -> pl.DataFrame: """Engineer ~228 generation features from PSR type data. Features per zone: - Individual PSR type generation (8 types × 2 = value + lag): 192 features - Total generation (sum across PSR types): 12 features - Renewable/Thermal shares: 24 features PSR Types: - B04: Fossil Gas - B05: Fossil Hard coal - B06: Fossil Oil - B11: Hydro Run-of-river - B12: Hydro Reservoir - B14: Nuclear - B16: Solar - B19: Wind Onshore Args: generation_df: Generation by PSR type (12 zones × 8 PSR types) Returns: DataFrame with generation features, indexed by timestamp """ print("\n[1/8] Engineering generation features...") # PSR type name mapping for clean feature names psr_name_map = { 'Fossil Gas': 'fossil_gas', 'Fossil Hard coal': 'fossil_coal', 'Fossil Oil': 'fossil_oil', 'Hydro Run-of-river and poundage': 'hydro_ror', 'Hydro Water Reservoir': 'hydro_reservoir', 'Nuclear': 'nuclear', 'Solar': 'solar', 'Wind Onshore': 'wind_onshore' } # Create individual PSR type features (8 PSR types × 12 zones = 96 base features) psr_features_list = [] for psr_name, psr_clean in psr_name_map.items(): # Filter data for this PSR type psr_data = generation_df.filter(pl.col('psr_name') == psr_name) if len(psr_data) > 0: # Pivot to wide format (one column per zone) psr_wide = psr_data.pivot( values='generation_mw', index='timestamp', on='zone', aggregate_function='sum' ) # Rename columns with PSR type prefix psr_cols = [c for c in psr_wide.columns if c != 'timestamp'] psr_wide = psr_wide.rename({c: f'gen_{psr_clean}_{c}' for c in psr_cols}) # Add lag features (t-1) for this PSR type lag_features = {} for col in psr_wide.columns: if col.startswith('gen_'): lag_features[f'{col}_lag1'] = pl.col(col).shift(1) psr_wide = psr_wide.with_columns(**lag_features) psr_features_list.append(psr_wide) # Aggregate features: Total generation per zone zone_total = generation_df.group_by(['timestamp', 'zone']).agg([ pl.col('generation_mw').sum().alias('total_gen') ]) total_gen_wide = zone_total.pivot( values='total_gen', index='timestamp', on='zone', aggregate_function='first' ).rename({c: f'gen_total_{c}' for c in zone_total['zone'].unique() if c != 'timestamp'}) # Aggregate features: Renewable and thermal shares zone_shares = generation_df.group_by(['timestamp', 'zone']).agg([ pl.col('generation_mw').sum().alias('total_gen'), pl.col('generation_mw').filter( pl.col('psr_name').is_in(['Wind Onshore', 'Solar', 'Hydro Run-of-river and poundage', 'Hydro Water Reservoir']) ).sum().alias('renewable_gen'), pl.col('generation_mw').filter( pl.col('psr_name').is_in(['Fossil Gas', 'Fossil Hard coal', 'Fossil Oil', 'Nuclear']) ).sum().alias('thermal_gen') ]) zone_shares = zone_shares.with_columns([ (pl.col('renewable_gen') / pl.col('total_gen').clip(lower_bound=1)).round(4).alias('renewable_share'), (pl.col('thermal_gen') / pl.col('total_gen').clip(lower_bound=1)).round(4).alias('thermal_share') ]) renewable_share_wide = zone_shares.pivot( values='renewable_share', index='timestamp', on='zone', aggregate_function='first' ).rename({c: f'gen_renewable_share_{c}' for c in zone_shares['zone'].unique() if c != 'timestamp'}) thermal_share_wide = zone_shares.pivot( values='thermal_share', index='timestamp', on='zone', aggregate_function='first' ).rename({c: f'gen_thermal_share_{c}' for c in zone_shares['zone'].unique() if c != 'timestamp'}) # Merge all generation features features = total_gen_wide features = features.join(renewable_share_wide, on='timestamp', how='left') features = features.join(thermal_share_wide, on='timestamp', how='left') for psr_features in psr_features_list: features = features.join(psr_features, on='timestamp', how='left') print(f" Created {len(features.columns) - 1} generation features") print(f" - Individual PSR types: {sum([len(pf.columns) - 1 for pf in psr_features_list])} features (8 types x 12 zones x 2)") print(f" - Aggregates: {len(total_gen_wide.columns) + len(renewable_share_wide.columns) + len(thermal_share_wide.columns) - 3} features") return features # ========================================================================= # Feature Category 2: Demand/Load # ========================================================================= def engineer_demand_features(demand_df: pl.DataFrame) -> pl.DataFrame: """Engineer ~24 demand features. Features per zone: - Actual demand - Demand lag (t-1) Args: demand_df: Actual demand data (12 zones) Returns: DataFrame with demand features, indexed by timestamp """ print("\n[2/8] Engineering demand features...") # FIX: Resample to hourly (some zones have 15-min data for 2025) demand_df = demand_df.with_columns([ pl.col('timestamp').dt.truncate('1h').alias('timestamp') ]) # Aggregate by hour (mean of sub-hourly values) demand_df = demand_df.group_by(['timestamp', 'zone']).agg([ pl.col('load_mw').mean().alias('load_mw') ]) # Pivot demand to wide format demand_wide = demand_df.pivot( values='load_mw', index='timestamp', on='zone', aggregate_function='first' ) # Rename to demand_ demand_cols = [c for c in demand_wide.columns if c != 'timestamp'] demand_wide = demand_wide.rename({c: f'demand_{c}' for c in demand_cols}) # Add lag features (t-1) lag_features = {} for col in demand_wide.columns: if col.startswith('demand_'): lag_features[f'{col}_lag1'] = pl.col(col).shift(1) features = demand_wide.with_columns(**lag_features) print(f" Created {len(features.columns) - 1} demand features") return features # ========================================================================= # Feature Category 3: Day-Ahead Prices # ========================================================================= def engineer_price_features(prices_df: pl.DataFrame) -> pl.DataFrame: """Engineer ~24 price features. Features per zone: - Day-ahead price - Price lag (t-1) Args: prices_df: Day-ahead prices (12 zones) Returns: DataFrame with price features, indexed by timestamp """ print("\n[3/8] Engineering price features...") # Pivot prices to wide format price_wide = prices_df.pivot( values='price_eur_mwh', index='timestamp', on='zone', aggregate_function='first' ) # Rename to price_ price_cols = [c for c in price_wide.columns if c != 'timestamp'] price_wide = price_wide.rename({c: f'price_{c}' for c in price_cols}) # Add lag features (t-1) lag_features = {} for col in price_wide.columns: if col.startswith('price_'): lag_features[f'{col}_lag1'] = pl.col(col).shift(1) features = price_wide.with_columns(**lag_features) print(f" Created {len(features.columns) - 1} price features") return features # ========================================================================= # Feature Category 4: Hydro Reservoir Storage # ========================================================================= def engineer_hydro_storage_features(hydro_df: pl.DataFrame) -> pl.DataFrame: """Engineer ~12 hydro storage features (weekly → hourly interpolation). Features per zone with data (6 zones): - Hydro storage level (interpolated to hourly) - Storage change (week-over-week) Args: hydro_df: Weekly hydro storage data (6 zones) Returns: DataFrame with hydro storage features, indexed by timestamp """ print("\n[4/8] Engineering hydro storage features...") # Pivot to wide format hydro_wide = hydro_df.pivot( values='storage_mwh', index='timestamp', on='zone', aggregate_function='first' ) # Rename to hydro_storage_ hydro_cols = [c for c in hydro_wide.columns if c != 'timestamp'] hydro_wide = hydro_wide.rename({c: f'hydro_storage_{c}' for c in hydro_cols}) # Create hourly date range (Oct 2023 - Sept 2025) hourly_range = pl.DataFrame({ 'timestamp': pl.datetime_range( start=hydro_wide['timestamp'].min(), end=hydro_wide['timestamp'].max(), interval='1h', eager=True ) }) # Cast timestamp to match precision (datetime[ns]) hydro_wide = hydro_wide.with_columns( pl.col('timestamp').cast(pl.Datetime('us')) ) # Join and interpolate (forward fill for weekly → hourly) features = hourly_range.join(hydro_wide, on='timestamp', how='left') # Forward fill missing values (weekly data → hourly) for col in features.columns: if col.startswith('hydro_storage_'): features = features.with_columns( pl.col(col).forward_fill().alias(col) ) # Add week-over-week change (168 hours = 1 week) change_features = {} for col in features.columns: if col.startswith('hydro_storage_'): change_features[f'{col}_change_w'] = pl.col(col) - pl.col(col).shift(168) features = features.with_columns(**change_features) print(f" Created {len(features.columns) - 1} hydro storage features") return features # ========================================================================= # Feature Category 5: Pumped Storage # ========================================================================= def engineer_pumped_storage_features(pumped_df: pl.DataFrame) -> pl.DataFrame: """Engineer ~10 pumped storage features. Features per zone with data (5 zones): - Pumped storage generation - Generation lag (t-1) Args: pumped_df: Pumped storage generation (5 zones) Returns: DataFrame with pumped storage features, indexed by timestamp """ print("\n[5/8] Engineering pumped storage features...") # Pivot to wide format pumped_wide = pumped_df.pivot( values='generation_mw', index='timestamp', on='zone', aggregate_function='first' ) # Rename to pumped_storage_ pumped_cols = [c for c in pumped_wide.columns if c != 'timestamp'] pumped_wide = pumped_wide.rename({c: f'pumped_storage_{c}' for c in pumped_cols}) # Add lag features (t-1) lag_features = {} for col in pumped_wide.columns: if col.startswith('pumped_storage_'): lag_features[f'{col}_lag1'] = pl.col(col).shift(1) features = pumped_wide.with_columns(**lag_features) print(f" Created {len(features.columns) - 1} pumped storage features") return features # ========================================================================= # Feature Category 6: Load Forecasts # ========================================================================= def engineer_load_forecast_features(forecast_df: pl.DataFrame) -> pl.DataFrame: """Engineer ~24 load forecast features. Features per zone: - Load forecast - Forecast error (forecast - actual, if available) Args: forecast_df: Load forecasts (12 zones) Returns: DataFrame with load forecast features, indexed by timestamp """ print("\n[6/8] Engineering load forecast features...") # FIX: Resample to hourly (some zones have 15-min data for 2025) forecast_df = forecast_df.with_columns([ pl.col('timestamp').dt.truncate('1h').alias('timestamp') ]) # Aggregate by hour (mean of sub-hourly values) forecast_df = forecast_df.group_by(['timestamp', 'zone']).agg([ pl.col('forecast_mw').mean().alias('forecast_mw') ]) # Pivot to wide format forecast_wide = forecast_df.pivot( values='forecast_mw', index='timestamp', on='zone', aggregate_function='first' ) # Rename to load_forecast_ forecast_cols = [c for c in forecast_wide.columns if c != 'timestamp'] forecast_wide = forecast_wide.rename({c: f'load_forecast_{c}' for c in forecast_cols}) print(f" Created {len(forecast_wide.columns) - 1} load forecast features") return forecast_wide # ========================================================================= # Feature Category 7: Transmission Outages (ALL 176 CNECs) # ========================================================================= def engineer_transmission_outage_features( outages_df: pl.DataFrame, cnec_master_df: pl.DataFrame, hourly_range: pl.DataFrame ) -> pl.DataFrame: """Engineer 176 transmission outage features (ALL CNECs with EIC mapping). Creates binary feature for each CNEC: - 1 = Outage active on this CNEC at this timestamp - 0 = No outage Uses EIC codes from master CNEC list to map ENTSO-E outages to CNECs. 31 CNECs have historical outages, 145 are zero-filled (ready for future). Args: outages_df: ENTSO-E transmission outages with Asset_RegisteredResource.mRID (EIC) cnec_master_df: Master CNEC list with cnec_eic column (176 rows) hourly_range: Hourly timestamp range (Oct 2023 - Sept 2025) Returns: DataFrame with 176 transmission outage features, indexed by timestamp """ print("\n[7/8] Engineering transmission outage features (ALL 176 CNECs)...") # Create EIC → CNEC mapping from master list eic_to_cnec = dict(zip(cnec_master_df['cnec_eic'], cnec_master_df['cnec_name'])) all_cnec_eics = cnec_master_df['cnec_eic'].to_list() print(f" Loaded {len(all_cnec_eics)} CNECs from master list") # Process outages: expand start → end to hourly timestamps if len(outages_df) == 0: print(" WARNING: No transmission outages found in raw data") # Create zero-filled features for all CNECs features = hourly_range.clone() for eic in all_cnec_eics: features = features.with_columns( pl.lit(0).alias(f'outage_cnec_{eic}') ) print(f" Created {len(all_cnec_eics)} zero-filled outage features") return features # Parse outage periods (start_time → end_time) outages_expanded = [] for row in outages_df.iter_rows(named=True): eic = row.get('asset_eic', None) start = row.get('start_time', None) end = row.get('end_time', None) if not eic or not start or not end: continue # Only process if EIC is in master CNEC list if eic not in all_cnec_eics: continue # Create hourly range for this outage outage_hours = pl.datetime_range( start=start, end=end, interval='1h', eager=True ) for hour in outage_hours: outages_expanded.append({ 'timestamp': hour, 'cnec_eic': eic, 'outage_active': 1 }) print(f" Expanded {len(outages_expanded)} hourly outage events") if len(outages_expanded) == 0: # No outages matched to CNECs - create zero-filled features features = hourly_range.clone() for eic in all_cnec_eics: features = features.with_columns( pl.lit(0).alias(f'outage_cnec_{eic}') ) print(f" Created {len(all_cnec_eics)} zero-filled outage features (no matches)") return features # Convert to Polars DataFrame outages_hourly = pl.DataFrame(outages_expanded) # Remove timezone from timestamp to match hourly_range outages_hourly = outages_hourly.with_columns( pl.col('timestamp').dt.replace_time_zone(None) ) # Pivot to wide format (one column per CNEC) outages_wide = outages_hourly.pivot( values='outage_active', index='timestamp', on='cnec_eic', aggregate_function='max' # If multiple outages, max = 1 ) # Rename columns outage_cols = [c for c in outages_wide.columns if c != 'timestamp'] outages_wide = outages_wide.rename({c: f'outage_cnec_{c}' for c in outage_cols}) # Join with hourly range to ensure complete timestamp coverage features = hourly_range.join(outages_wide, on='timestamp', how='left') # Fill nulls with 0 (no outage) for col in features.columns: if col.startswith('outage_cnec_'): features = features.with_columns( pl.col(col).fill_null(0).alias(col) ) # Add zero-filled features for CNECs with no historical outages existing_cnecs = [c.replace('outage_cnec_', '') for c in features.columns if c.startswith('outage_cnec_')] missing_cnecs = [eic for eic in all_cnec_eics if eic not in existing_cnecs] for eic in missing_cnecs: features = features.with_columns( pl.lit(0).alias(f'outage_cnec_{eic}') ) cnecs_with_data = len(existing_cnecs) cnecs_zero_filled = len(missing_cnecs) print(f" Created {len(features.columns) - 1} transmission outage features:") print(f" - {cnecs_with_data} CNECs with historical outages") print(f" - {cnecs_zero_filled} CNECs zero-filled (ready for future)") return features # ========================================================================= # Feature Category 8: Generation Outages # ========================================================================= def engineer_generation_outage_features(gen_outages_df: pl.DataFrame, hourly_range: pl.DataFrame) -> pl.DataFrame: """Engineer ~45 generation outage features (nuclear, coal, lignite by zone). Features per zone and PSR type: - Nuclear capacity offline (MW) - Coal capacity offline (MW) - Lignite capacity offline (MW) - Total outage count - Binary outage indicator Args: gen_outages_df: Generation unavailability data with columns: ['timestamp', 'zone', 'psr_type', 'capacity_mw', 'unit_name'] hourly_range: Hourly timestamps for alignment Returns: DataFrame with generation outage features, indexed by timestamp """ print("\n[8/8] Engineering generation outage features...") if len(gen_outages_df) == 0: print(" WARNING: No generation outages found - creating zero-filled features") # Create zero-filled features for all zones features = hourly_range.select('timestamp') zones = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK', 'DE_LU', 'PL'] for zone in zones: features = features.with_columns([ pl.lit(0.0).alias(f'gen_outage_nuclear_mw_{zone}'), pl.lit(0.0).alias(f'gen_outage_coal_mw_{zone}'), pl.lit(0.0).alias(f'gen_outage_lignite_mw_{zone}'), pl.lit(0).alias(f'gen_outage_count_{zone}'), pl.lit(0).alias(f'gen_outage_active_{zone}') ]) print(f" Created {len(features.columns) - 1} zero-filled generation outage features") return features # Expand outages to hourly granularity (outages span multiple hours) print(" Expanding outages to hourly timestamps...") # Create hourly records for each outage period hourly_outages = [] for row in gen_outages_df.iter_rows(named=True): start = row['start_time'] end = row['end_time'] # Generate hourly timestamps for outage period outage_hours = pl.datetime_range( start=start, end=end, interval='1h', eager=True ).to_frame('timestamp') # Add outage metadata outage_hours = outage_hours.with_columns([ pl.lit(row['zone']).alias('zone'), pl.lit(row['psr_type']).alias('psr_type'), pl.lit(row['capacity_mw']).alias('capacity_mw'), pl.lit(row['unit_name']).alias('unit_name') ]) hourly_outages.append(outage_hours) # Combine all hourly outages hourly_outages_df = pl.concat(hourly_outages) print(f" Expanded to {len(hourly_outages_df):,} hourly outage records") # Map PSR types to clean names psr_map = { 'B14': 'nuclear', 'B05': 'coal', 'B02': 'lignite' } hourly_outages_df = hourly_outages_df.with_columns( pl.col('psr_type').replace(psr_map).alias('psr_clean') ) # Create features for each PSR type all_features = [hourly_range.select('timestamp')] for psr_code, psr_name in psr_map.items(): psr_outages = hourly_outages_df.filter(pl.col('psr_type') == psr_code) if len(psr_outages) > 0: # Aggregate capacity by zone and timestamp psr_agg = psr_outages.group_by(['timestamp', 'zone']).agg( pl.col('capacity_mw').sum().alias('capacity_mw') ) # Pivot to wide format psr_wide = psr_agg.pivot( values='capacity_mw', index='timestamp', on='zone' ) # Rename columns rename_map = { col: f'gen_outage_{psr_name}_mw_{col}' for col in psr_wide.columns if col != 'timestamp' } psr_wide = psr_wide.rename(rename_map) all_features.append(psr_wide) # Create aggregate count and binary indicator features total_agg = hourly_outages_df.group_by(['timestamp', 'zone']).agg([ pl.col('unit_name').n_unique().alias('outage_count'), pl.lit(1).alias('outage_active') ]) # Pivot count count_wide = total_agg.pivot( values='outage_count', index='timestamp', on='zone' ).rename({ col: f'gen_outage_count_{col}' for col in total_agg['zone'].unique() if col != 'timestamp' }) # Pivot binary indicator active_wide = total_agg.pivot( values='outage_active', index='timestamp', on='zone' ).rename({ col: f'gen_outage_active_{col}' for col in total_agg['zone'].unique() if col != 'timestamp' }) all_features.extend([count_wide, active_wide]) # Join all features features = all_features[0] for feat_df in all_features[1:]: features = features.join(feat_df, on='timestamp', how='left') # Fill nulls with zeros (no outage) feature_cols = [col for col in features.columns if col != 'timestamp'] features = features.with_columns([ pl.col(col).fill_null(0) for col in feature_cols ]) print(f" Created {len(features.columns) - 1} generation outage features") print(f" - Nuclear: capacity offline per zone") print(f" - Coal: capacity offline per zone") print(f" - Lignite: capacity offline per zone") print(f" - Count: number of units offline per zone") print(f" - Active: binary indicator per zone") return features # ========================================================================= # Main Pipeline # ========================================================================= def engineer_all_entsoe_features( data_dir: Path = Path("data/raw"), output_path: Path = Path("data/processed/features_entsoe_24month.parquet") ) -> pl.DataFrame: """Engineer all ENTSO-E features from 8 raw datasets. Args: data_dir: Directory containing raw ENTSO-E data output_path: Path to save engineered features Returns: DataFrame with ~324-424 ENTSO-E features, indexed by timestamp """ print("\n" + "="*70) print("ENTSO-E FEATURE ENGINEERING PIPELINE") print("="*70) # Load master CNEC list (for transmission outage mapping) cnec_master = pl.read_csv("data/processed/cnecs_master_176.csv") print(f"\nLoaded master CNEC list: {len(cnec_master)} CNECs") # Create hourly timestamp range (Oct 2023 - Sept 2025) hourly_range = pl.DataFrame({ 'timestamp': pl.datetime_range( start=pl.datetime(2023, 10, 1, 0, 0), end=pl.datetime(2025, 9, 30, 23, 0), interval='1h', eager=True ) }) print(f"Created hourly range: {len(hourly_range)} timestamps") # Load raw ENTSO-E datasets generation_df = pl.read_parquet(data_dir / "entsoe_generation_by_psr_24month.parquet") demand_df = pl.read_parquet(data_dir / "entsoe_demand_24month.parquet") prices_df = pl.read_parquet(data_dir / "entsoe_prices_24month.parquet") hydro_df = pl.read_parquet(data_dir / "entsoe_hydro_storage_24month.parquet") pumped_df = pl.read_parquet(data_dir / "entsoe_pumped_storage_24month.parquet") forecast_df = pl.read_parquet(data_dir / "entsoe_load_forecast_24month.parquet") transmission_outages_df = pl.read_parquet(data_dir / "entsoe_transmission_outages_24month.parquet") # Check for generation outages file gen_outages_path = data_dir / "entsoe_generation_outages_24month.parquet" if gen_outages_path.exists(): gen_outages_df = pl.read_parquet(gen_outages_path) else: print("\nWARNING: Generation outages file not found, skipping...") gen_outages_df = pl.DataFrame() print(f"\nLoaded 8 ENTSO-E datasets:") print(f" - Generation: {len(generation_df):,} rows") print(f" - Demand: {len(demand_df):,} rows") print(f" - Prices: {len(prices_df):,} rows") print(f" - Hydro storage: {len(hydro_df):,} rows") print(f" - Pumped storage: {len(pumped_df):,} rows") print(f" - Load forecast: {len(forecast_df):,} rows") print(f" - Transmission outages: {len(transmission_outages_df):,} rows") print(f" - Generation outages: {len(gen_outages_df):,} rows") # Engineer features for each category gen_features = engineer_generation_features(generation_df) demand_features = engineer_demand_features(demand_df) price_features = engineer_price_features(prices_df) hydro_features = engineer_hydro_storage_features(hydro_df) pumped_features = engineer_pumped_storage_features(pumped_df) forecast_features = engineer_load_forecast_features(forecast_df) transmission_outage_features = engineer_transmission_outage_features( transmission_outages_df, cnec_master, hourly_range ) if len(gen_outages_df) > 0: gen_outage_features = engineer_generation_outage_features(gen_outages_df, hourly_range) else: gen_outage_features = engineer_generation_outage_features(pl.DataFrame(), hourly_range) # Merge all features on timestamp print("\n" + "="*70) print("MERGING ALL FEATURES") print("="*70) features = hourly_range.clone() # Standardize timestamps (remove timezone, cast to datetime[μs]) def standardize_timestamp(df): """Remove timezone and cast timestamp to datetime[μs].""" if 'timestamp' in df.columns: # Check if timestamp has timezone if hasattr(df['timestamp'].dtype, 'time_zone') and df['timestamp'].dtype.time_zone is not None: df = df.with_columns(pl.col('timestamp').dt.replace_time_zone(None)) # Cast to datetime[μs] df = df.with_columns(pl.col('timestamp').cast(pl.Datetime('us'))) return df gen_features = standardize_timestamp(gen_features) demand_features = standardize_timestamp(demand_features) price_features = standardize_timestamp(price_features) hydro_features = standardize_timestamp(hydro_features) pumped_features = standardize_timestamp(pumped_features) forecast_features = standardize_timestamp(forecast_features) transmission_outage_features = standardize_timestamp(transmission_outage_features) # Join each feature set features = features.join(gen_features, on='timestamp', how='left') features = features.join(demand_features, on='timestamp', how='left') features = features.join(price_features, on='timestamp', how='left') features = features.join(hydro_features, on='timestamp', how='left') features = features.join(pumped_features, on='timestamp', how='left') features = features.join(forecast_features, on='timestamp', how='left') features = features.join(transmission_outage_features, on='timestamp', how='left') features = features.join(gen_outage_features, on='timestamp', how='left') print(f"\nFinal feature matrix shape: {features.shape}") print(f" - Timestamps: {len(features):,}") print(f" - Features: {len(features.columns) - 1:,}") # Check for nulls null_counts = features.null_count() total_nulls = sum([null_counts[col][0] for col in features.columns if col != 'timestamp']) null_pct = (total_nulls / (len(features) * (len(features.columns) - 1))) * 100 print(f"\nData quality:") print(f" - Total nulls: {total_nulls:,} ({null_pct:.2f}%)") print(f" - Completeness: {100 - null_pct:.2f}%") # Clean up redundant features print("\n" + "="*70) print("CLEANING REDUNDANT FEATURES") print("="*70) features_before = len(features.columns) - 1 # Remove 100% null features null_pcts = {} for col in features.columns: if col != 'timestamp': null_count = features[col].null_count() null_pct_col = (null_count / len(features)) * 100 if null_pct_col >= 100.0: null_pcts[col] = null_pct_col if null_pcts: print(f"\nRemoving {len(null_pcts)} features with 100% nulls:") for col in null_pcts: print(f" - {col}") features = features.drop(list(null_pcts.keys())) # Remove zero-variance features (constants) # EXCEPT transmission outage features - keep them even if zero-filled for future inference zero_var_cols = [] for col in features.columns: if col != 'timestamp': # Skip transmission outage features (needed for future inference) if col.startswith('outage_cnec_'): continue # Check if all values are the same (excluding nulls) non_null = features[col].drop_nulls() if len(non_null) > 0 and non_null.n_unique() == 1: zero_var_cols.append(col) if zero_var_cols: print(f"\nRemoving {len(zero_var_cols)} zero-variance features (keeping transmission outages)") features = features.drop(zero_var_cols) # Remove duplicate columns # EXCEPT transmission outage features - keep all CNECs even if identical (all zeros) dup_groups = {} cols_to_check = [c for c in features.columns if c != 'timestamp'] for i, col1 in enumerate(cols_to_check): if col1 in dup_groups.values(): # Already marked as duplicate continue # Skip transmission outage features (each CNEC needs its own column for inference) if col1.startswith('outage_cnec_'): continue for col2 in cols_to_check[i+1:]: if col2 in dup_groups.values(): # Already marked as duplicate continue # Skip transmission outage features if col2.startswith('outage_cnec_'): continue # Check if columns are identical if features[col1].equals(features[col2]): dup_groups[col2] = col1 # col2 is duplicate of col1 if dup_groups: print(f"\nRemoving {len(dup_groups)} duplicate columns (keeping transmission outages)") features = features.drop(list(dup_groups.keys())) features_after = len(features.columns) - 1 features_removed = features_before - features_after print(f"\n[CLEANUP SUMMARY]") print(f" Features before: {features_before}") print(f" Features after: {features_after}") print(f" Features removed: {features_removed} ({features_removed/features_before*100:.1f}%)") # Save features output_path.parent.mkdir(parents=True, exist_ok=True) features.write_parquet(output_path) file_size_mb = output_path.stat().st_size / (1024 * 1024) print(f"\nSaved ENTSO-E features to: {output_path}") print(f"File size: {file_size_mb:.2f} MB") print("\n" + "="*70) print("ENTSO-E FEATURE ENGINEERING COMPLETE") print("="*70) return features if __name__ == "__main__": # Run feature engineering pipeline features = engineer_all_entsoe_features() print("\n[SUCCESS] ENTSO-E features engineered and saved!") print(f"Feature count: {len(features.columns) - 1}") print(f"Timestamp range: {features['timestamp'].min()} to {features['timestamp'].max()}")