"""Engineer 375 Weather features for FBMC forecasting. Transforms OpenMeteo weather data into model-ready features: 1. Grid-level features (51 points × 7 vars = 357 features) 2. Temporal lags (3 vars × 4 time periods = 12 features) 3. Derived features (rate-of-change + stability = 6 features) Total: 375 weather features Weather Variables (7): - temperature_2m (C) - windspeed_10m (m/s) - windspeed_100m (m/s) - for wind generation - winddirection_100m (degrees) - shortwave_radiation (W/m2) - for solar generation - cloudcover (%) - surface_pressure (hPa) Author: Claude Date: 2025-11-10 """ from pathlib import Path import polars as pl def engineer_grid_level_features(weather_df: pl.DataFrame) -> pl.DataFrame: """Engineer grid-level weather features (51 points × 7 vars = 357 features). For each grid point, pivot all 7 weather variables to wide format: - temp_ - wind10m_ - wind100m_ - winddir_ - solar_ - cloud_ - pressure_ """ print("\n[1/5] Engineering grid-level features (51 points × 7 vars)...") # Pivot each weather variable separately features = None weather_vars = [ ('temperature_2m', 'temp'), ('windspeed_10m', 'wind10m'), ('windspeed_100m', 'wind100m'), ('winddirection_100m', 'winddir'), ('shortwave_radiation', 'solar'), ('cloudcover', 'cloud'), ('surface_pressure', 'pressure') ] for orig_col, short_name in weather_vars: print(f" Pivoting {orig_col}...") pivoted = weather_df.select(['timestamp', 'grid_point', orig_col]).pivot( values=orig_col, index='timestamp', on='grid_point', aggregate_function='first' ) # Rename columns to _ rename_map = {} for col in pivoted.columns: if col != 'timestamp': rename_map[col] = f'{short_name}_{col}' pivoted = pivoted.rename(rename_map) # Join to features if features is None: features = pivoted else: features = features.join(pivoted, on='timestamp', how='left', coalesce=True) print(f" [OK] {len(features.columns) - 1} grid-level features") return features def engineer_temporal_lags(features: pl.DataFrame) -> pl.DataFrame: """Add temporal lags for key weather variables. Lags: 1h, 6h, 12h, 24h for: - Average temperature (1 lag feature) - Average wind speed (1 lag feature) - Average solar radiation (1 lag feature) Total: ~12 lag features (3 vars × 4 lags) """ print("\n[2/3] Engineering temporal lags (1h, 6h, 12h, 24h)...") # Calculate system-wide averages for lagging # Temperature average (across all temp_ columns) temp_cols = [c for c in features.columns if c.startswith('temp_')] features = features.with_columns([ pl.concat_list([pl.col(c) for c in temp_cols]).list.mean().alias('temp_avg') ]) # Wind speed average (100m - for wind generation) wind_cols = [c for c in features.columns if c.startswith('wind100m_')] features = features.with_columns([ pl.concat_list([pl.col(c) for c in wind_cols]).list.mean().alias('wind_avg') ]) # Solar radiation average solar_cols = [c for c in features.columns if c.startswith('solar_')] features = features.with_columns([ pl.concat_list([pl.col(c) for c in solar_cols]).list.mean().alias('solar_avg') ]) # Add lags lag_vars = ['temp_avg', 'wind_avg', 'solar_avg'] lag_hours = [1, 6, 12, 24] for var in lag_vars: for lag_h in lag_hours: features = features.with_columns([ pl.col(var).shift(lag_h).alias(f'{var}_lag{lag_h}h') ]) # Drop intermediate averages (keep only lagged versions) features = features.drop(['temp_avg', 'wind_avg', 'solar_avg']) lag_features = len(lag_vars) * len(lag_hours) print(f" [OK] {lag_features} temporal lag features") return features def engineer_derived_features(features: pl.DataFrame) -> pl.DataFrame: """Engineer derived weather features (6 features). Simple features without requiring calibration data: - Rate of change (hour-over-hour deltas): wind, solar, temperature - Weather stability (rolling std): wind, solar, temperature """ print("\n[3/3] Engineering derived features (rate-of-change + stability)...") # Calculate system averages for rate-of-change and stability wind_cols = [c for c in features.columns if c.startswith('wind100m_')] solar_cols = [c for c in features.columns if c.startswith('solar_')] temp_cols = [c for c in features.columns if c.startswith('temp_')] features = features.with_columns([ pl.concat_list([pl.col(c) for c in wind_cols]).list.mean().alias('wind_system_avg'), pl.concat_list([pl.col(c) for c in solar_cols]).list.mean().alias('solar_system_avg'), pl.concat_list([pl.col(c) for c in temp_cols]).list.mean().alias('temp_system_avg') ]) # Rate of change (hour-over-hour deltas) # Captures sudden spikes/drops that correlate with grid constraints features = features.with_columns([ pl.col('wind_system_avg').diff().alias('wind_rate_change'), pl.col('solar_system_avg').diff().alias('solar_rate_change'), pl.col('temp_system_avg').diff().alias('temp_rate_change') ]) # Weather stability: 6-hour rolling std # Detects volatility periods (useful for forecasting uncertainty) features = features.with_columns([ pl.col('wind_system_avg').rolling_std(window_size=6).alias('wind_stability_6h'), pl.col('solar_system_avg').rolling_std(window_size=6).alias('solar_stability_6h'), pl.col('temp_system_avg').rolling_std(window_size=6).alias('temp_stability_6h') ]) # Drop intermediate columns features = features.drop(['wind_system_avg', 'solar_system_avg', 'temp_system_avg']) # Count derived features derived_cols = ['wind_rate_change', 'solar_rate_change', 'temp_rate_change', 'wind_stability_6h', 'solar_stability_6h', 'temp_stability_6h'] print(f" [OK] {len(derived_cols)} derived features") return features def engineer_weather_features( weather_path: Path, output_dir: Path ) -> pl.DataFrame: """Main feature engineering pipeline for weather data. Args: weather_path: Path to raw weather data (weather_24month.parquet) output_dir: Directory to save engineered features Returns: DataFrame with ~435 weather features """ print("=" * 80) print("WEATHER FEATURE ENGINEERING") print("=" * 80) print() print(f"Input: {weather_path}") print(f"Output: {output_dir}") print() # Load raw weather data print("Loading weather data...") weather_df = pl.read_parquet(weather_path) print(f" [OK] {weather_df.shape[0]:,} rows × {weather_df.shape[1]} columns") print(f" Date range: {weather_df['timestamp'].min()} to {weather_df['timestamp'].max()}") print() # 1. Grid-level features (51 × 7 = 357 features) all_features = engineer_grid_level_features(weather_df) # 2. Temporal lags (~12 features) all_features = engineer_temporal_lags(all_features) # 3. Derived features (6 features: rate-of-change + stability) all_features = engineer_derived_features(all_features) # Sort by timestamp all_features = all_features.sort('timestamp') # Final validation print("\n" + "=" * 80) print("FEATURE ENGINEERING COMPLETE") print("=" * 80) print(f"Total features: {all_features.shape[1] - 1} (excluding timestamp)") print(f"Total rows: {len(all_features):,}") # Check completeness null_count_total = all_features.null_count().sum_horizontal()[0] completeness = (1 - null_count_total / (all_features.shape[0] * all_features.shape[1])) * 100 print(f"Completeness: {completeness:.2f}%") print() # Save features output_path = output_dir / 'features_weather_24month.parquet' all_features.write_parquet(output_path) file_size_mb = output_path.stat().st_size / (1024 ** 2) print(f"Features saved: {output_path}") print(f"File size: {file_size_mb:.2f} MB") print("=" * 80) print() return all_features def main(): """Main execution.""" # Paths base_dir = Path.cwd() raw_dir = base_dir / 'data' / 'raw' processed_dir = base_dir / 'data' / 'processed' weather_path = raw_dir / 'weather_24month.parquet' # Verify file exists if not weather_path.exists(): raise FileNotFoundError(f"Weather data not found: {weather_path}") # Engineer features features = engineer_weather_features(weather_path, processed_dir) print("SUCCESS: Weather features engineered and saved to data/processed/") if __name__ == '__main__': main()