Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
feat: complete weather feature engineering with simplified approach (375 features)
7aa0336
| """Engineer 375 Weather features for FBMC forecasting. | |
| Transforms OpenMeteo weather data into model-ready features: | |
| 1. Grid-level features (51 points × 7 vars = 357 features) | |
| 2. Temporal lags (3 vars × 4 time periods = 12 features) | |
| 3. Derived features (rate-of-change + stability = 6 features) | |
| Total: 375 weather features | |
| Weather Variables (7): | |
| - temperature_2m (C) | |
| - windspeed_10m (m/s) | |
| - windspeed_100m (m/s) - for wind generation | |
| - winddirection_100m (degrees) | |
| - shortwave_radiation (W/m2) - for solar generation | |
| - cloudcover (%) | |
| - surface_pressure (hPa) | |
| Author: Claude | |
| Date: 2025-11-10 | |
| """ | |
| from pathlib import Path | |
| import polars as pl | |
| def engineer_grid_level_features(weather_df: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer grid-level weather features (51 points × 7 vars = 357 features). | |
| For each grid point, pivot all 7 weather variables to wide format: | |
| - temp_<grid_point> | |
| - wind10m_<grid_point> | |
| - wind100m_<grid_point> | |
| - winddir_<grid_point> | |
| - solar_<grid_point> | |
| - cloud_<grid_point> | |
| - pressure_<grid_point> | |
| """ | |
| print("\n[1/5] Engineering grid-level features (51 points × 7 vars)...") | |
| # Pivot each weather variable separately | |
| features = None | |
| weather_vars = [ | |
| ('temperature_2m', 'temp'), | |
| ('windspeed_10m', 'wind10m'), | |
| ('windspeed_100m', 'wind100m'), | |
| ('winddirection_100m', 'winddir'), | |
| ('shortwave_radiation', 'solar'), | |
| ('cloudcover', 'cloud'), | |
| ('surface_pressure', 'pressure') | |
| ] | |
| for orig_col, short_name in weather_vars: | |
| print(f" Pivoting {orig_col}...") | |
| pivoted = weather_df.select(['timestamp', 'grid_point', orig_col]).pivot( | |
| values=orig_col, | |
| index='timestamp', | |
| on='grid_point', | |
| aggregate_function='first' | |
| ) | |
| # Rename columns to <short_name>_<grid_point> | |
| rename_map = {} | |
| for col in pivoted.columns: | |
| if col != 'timestamp': | |
| rename_map[col] = f'{short_name}_{col}' | |
| pivoted = pivoted.rename(rename_map) | |
| # Join to features | |
| if features is None: | |
| features = pivoted | |
| else: | |
| features = features.join(pivoted, on='timestamp', how='left', coalesce=True) | |
| print(f" [OK] {len(features.columns) - 1} grid-level features") | |
| return features | |
| def engineer_temporal_lags(features: pl.DataFrame) -> pl.DataFrame: | |
| """Add temporal lags for key weather variables. | |
| Lags: 1h, 6h, 12h, 24h for: | |
| - Average temperature (1 lag feature) | |
| - Average wind speed (1 lag feature) | |
| - Average solar radiation (1 lag feature) | |
| Total: ~12 lag features (3 vars × 4 lags) | |
| """ | |
| print("\n[2/3] Engineering temporal lags (1h, 6h, 12h, 24h)...") | |
| # Calculate system-wide averages for lagging | |
| # Temperature average (across all temp_ columns) | |
| temp_cols = [c for c in features.columns if c.startswith('temp_')] | |
| features = features.with_columns([ | |
| pl.concat_list([pl.col(c) for c in temp_cols]).list.mean().alias('temp_avg') | |
| ]) | |
| # Wind speed average (100m - for wind generation) | |
| wind_cols = [c for c in features.columns if c.startswith('wind100m_')] | |
| features = features.with_columns([ | |
| pl.concat_list([pl.col(c) for c in wind_cols]).list.mean().alias('wind_avg') | |
| ]) | |
| # Solar radiation average | |
| solar_cols = [c for c in features.columns if c.startswith('solar_')] | |
| features = features.with_columns([ | |
| pl.concat_list([pl.col(c) for c in solar_cols]).list.mean().alias('solar_avg') | |
| ]) | |
| # Add lags | |
| lag_vars = ['temp_avg', 'wind_avg', 'solar_avg'] | |
| lag_hours = [1, 6, 12, 24] | |
| for var in lag_vars: | |
| for lag_h in lag_hours: | |
| features = features.with_columns([ | |
| pl.col(var).shift(lag_h).alias(f'{var}_lag{lag_h}h') | |
| ]) | |
| # Drop intermediate averages (keep only lagged versions) | |
| features = features.drop(['temp_avg', 'wind_avg', 'solar_avg']) | |
| lag_features = len(lag_vars) * len(lag_hours) | |
| print(f" [OK] {lag_features} temporal lag features") | |
| return features | |
| def engineer_derived_features(features: pl.DataFrame) -> pl.DataFrame: | |
| """Engineer derived weather features (6 features). | |
| Simple features without requiring calibration data: | |
| - Rate of change (hour-over-hour deltas): wind, solar, temperature | |
| - Weather stability (rolling std): wind, solar, temperature | |
| """ | |
| print("\n[3/3] Engineering derived features (rate-of-change + stability)...") | |
| # Calculate system averages for rate-of-change and stability | |
| wind_cols = [c for c in features.columns if c.startswith('wind100m_')] | |
| solar_cols = [c for c in features.columns if c.startswith('solar_')] | |
| temp_cols = [c for c in features.columns if c.startswith('temp_')] | |
| features = features.with_columns([ | |
| pl.concat_list([pl.col(c) for c in wind_cols]).list.mean().alias('wind_system_avg'), | |
| pl.concat_list([pl.col(c) for c in solar_cols]).list.mean().alias('solar_system_avg'), | |
| pl.concat_list([pl.col(c) for c in temp_cols]).list.mean().alias('temp_system_avg') | |
| ]) | |
| # Rate of change (hour-over-hour deltas) | |
| # Captures sudden spikes/drops that correlate with grid constraints | |
| features = features.with_columns([ | |
| pl.col('wind_system_avg').diff().alias('wind_rate_change'), | |
| pl.col('solar_system_avg').diff().alias('solar_rate_change'), | |
| pl.col('temp_system_avg').diff().alias('temp_rate_change') | |
| ]) | |
| # Weather stability: 6-hour rolling std | |
| # Detects volatility periods (useful for forecasting uncertainty) | |
| features = features.with_columns([ | |
| pl.col('wind_system_avg').rolling_std(window_size=6).alias('wind_stability_6h'), | |
| pl.col('solar_system_avg').rolling_std(window_size=6).alias('solar_stability_6h'), | |
| pl.col('temp_system_avg').rolling_std(window_size=6).alias('temp_stability_6h') | |
| ]) | |
| # Drop intermediate columns | |
| features = features.drop(['wind_system_avg', 'solar_system_avg', 'temp_system_avg']) | |
| # Count derived features | |
| derived_cols = ['wind_rate_change', 'solar_rate_change', 'temp_rate_change', | |
| 'wind_stability_6h', 'solar_stability_6h', 'temp_stability_6h'] | |
| print(f" [OK] {len(derived_cols)} derived features") | |
| return features | |
| def engineer_weather_features( | |
| weather_path: Path, | |
| output_dir: Path | |
| ) -> pl.DataFrame: | |
| """Main feature engineering pipeline for weather data. | |
| Args: | |
| weather_path: Path to raw weather data (weather_24month.parquet) | |
| output_dir: Directory to save engineered features | |
| Returns: | |
| DataFrame with ~435 weather features | |
| """ | |
| print("=" * 80) | |
| print("WEATHER FEATURE ENGINEERING") | |
| print("=" * 80) | |
| print() | |
| print(f"Input: {weather_path}") | |
| print(f"Output: {output_dir}") | |
| print() | |
| # Load raw weather data | |
| print("Loading weather data...") | |
| weather_df = pl.read_parquet(weather_path) | |
| print(f" [OK] {weather_df.shape[0]:,} rows × {weather_df.shape[1]} columns") | |
| print(f" Date range: {weather_df['timestamp'].min()} to {weather_df['timestamp'].max()}") | |
| print() | |
| # 1. Grid-level features (51 × 7 = 357 features) | |
| all_features = engineer_grid_level_features(weather_df) | |
| # 2. Temporal lags (~12 features) | |
| all_features = engineer_temporal_lags(all_features) | |
| # 3. Derived features (6 features: rate-of-change + stability) | |
| all_features = engineer_derived_features(all_features) | |
| # Sort by timestamp | |
| all_features = all_features.sort('timestamp') | |
| # Final validation | |
| print("\n" + "=" * 80) | |
| print("FEATURE ENGINEERING COMPLETE") | |
| print("=" * 80) | |
| print(f"Total features: {all_features.shape[1] - 1} (excluding timestamp)") | |
| print(f"Total rows: {len(all_features):,}") | |
| # Check completeness | |
| null_count_total = all_features.null_count().sum_horizontal()[0] | |
| completeness = (1 - null_count_total / (all_features.shape[0] * all_features.shape[1])) * 100 | |
| print(f"Completeness: {completeness:.2f}%") | |
| print() | |
| # Save features | |
| output_path = output_dir / 'features_weather_24month.parquet' | |
| all_features.write_parquet(output_path) | |
| file_size_mb = output_path.stat().st_size / (1024 ** 2) | |
| print(f"Features saved: {output_path}") | |
| print(f"File size: {file_size_mb:.2f} MB") | |
| print("=" * 80) | |
| print() | |
| return all_features | |
| def main(): | |
| """Main execution.""" | |
| # Paths | |
| base_dir = Path.cwd() | |
| raw_dir = base_dir / 'data' / 'raw' | |
| processed_dir = base_dir / 'data' / 'processed' | |
| weather_path = raw_dir / 'weather_24month.parquet' | |
| # Verify file exists | |
| if not weather_path.exists(): | |
| raise FileNotFoundError(f"Weather data not found: {weather_path}") | |
| # Engineer features | |
| features = engineer_weather_features(weather_path, processed_dir) | |
| print("SUCCESS: Weather features engineered and saved to data/processed/") | |
| if __name__ == '__main__': | |
| main() | |