Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
feat: Phase 1 complete - Master CNEC list + synchronized feature engineering
d4939ce
| """ | |
| Convert manually exported Alegro outages to standardized parquet format. | |
| After manually exporting from ENTSO-E web UI, run this script to convert | |
| the CSV/Excel to our standard schema. | |
| Usage: | |
| python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv | |
| Expected columns in manual export (may vary): | |
| - Asset Name / Resource Name | |
| - Asset EIC / mRID | |
| - Start Time / Unavailability Start | |
| - End Time / Unavailability End | |
| - Business Type / Type (A53=Planned, A54=Forced) | |
| - Available Capacity / Unavailable Capacity (MW) | |
| Author: Claude + Evgueni Poloukarov | |
| Date: 2025-11-09 | |
| """ | |
| import sys | |
| from pathlib import Path | |
| import polars as pl | |
| import pandas as pd | |
| def convert_alegro_export(input_file: Path, output_path: Path) -> pl.DataFrame: | |
| """ | |
| Convert manually exported Alegro outages to standard schema. | |
| Args: | |
| input_file: Path to downloaded CSV/Excel file | |
| output_path: Path to save standardized parquet | |
| Returns: | |
| Standardized outages DataFrame | |
| """ | |
| print("=" * 80) | |
| print("CONVERTING MANUAL ALEGRO OUTAGE EXPORT") | |
| print("=" * 80) | |
| print(f"\nInput: {input_file}") | |
| print() | |
| # Read file (supports both CSV and Excel) | |
| if input_file.suffix.lower() in ['.csv', '.txt']: | |
| print("Reading CSV file...") | |
| df = pl.read_csv(input_file) | |
| elif input_file.suffix.lower() in ['.xlsx', '.xls']: | |
| print("Reading Excel file...") | |
| df_pandas = pd.read_excel(input_file) | |
| df = pl.from_pandas(df_pandas) | |
| else: | |
| raise ValueError(f"Unsupported file format: {input_file.suffix}") | |
| print(f" Loaded {len(df)} rows, {len(df.columns)} columns") | |
| print(f" Columns: {df.columns}") | |
| print() | |
| # Show first few rows to help identify column names | |
| print("Sample data:") | |
| print(df.head(3)) | |
| print() | |
| # Map columns to standard schema (flexible mapping) | |
| column_mapping = {} | |
| # Find asset EIC column | |
| eic_candidates = [c for c in df.columns if any(x in c.lower() for x in ['eic', 'mrid', 'code', 'id'])] | |
| if eic_candidates: | |
| column_mapping['asset_eic'] = eic_candidates[0] | |
| print(f"Mapped asset_eic <- {eic_candidates[0]}") | |
| # Find asset name column | |
| name_candidates = [c for c in df.columns if any(x in c.lower() for x in ['name', 'resource', 'asset'])] | |
| if name_candidates: | |
| column_mapping['asset_name'] = name_candidates[0] | |
| print(f"Mapped asset_name <- {name_candidates[0]}") | |
| # Find start time column | |
| start_candidates = [c for c in df.columns if any(x in c.lower() for x in ['start', 'begin', 'from'])] | |
| if start_candidates: | |
| column_mapping['start_time'] = start_candidates[0] | |
| print(f"Mapped start_time <- {start_candidates[0]}") | |
| # Find end time column | |
| end_candidates = [c for c in df.columns if any(x in c.lower() for x in ['end', 'to', 'until'])] | |
| if end_candidates: | |
| column_mapping['end_time'] = end_candidates[0] | |
| print(f"Mapped end_time <- {end_candidates[0]}") | |
| # Find business type column | |
| type_candidates = [c for c in df.columns if any(x in c.lower() for x in ['type', 'business', 'category'])] | |
| if type_candidates: | |
| column_mapping['businesstype'] = type_candidates[0] | |
| print(f"Mapped businesstype <- {type_candidates[0]}") | |
| # Find capacity column (if available) | |
| capacity_candidates = [c for c in df.columns if any(x in c.lower() for x in ['capacity', 'mw', 'power'])] | |
| if capacity_candidates: | |
| column_mapping['capacity_mw'] = capacity_candidates[0] | |
| print(f"Mapped capacity_mw <- {capacity_candidates[0]}") | |
| print() | |
| if not column_mapping: | |
| print("[ERROR] Could not automatically map columns!") | |
| print("Please manually map columns in the script.") | |
| print() | |
| print("Available columns:") | |
| for i, col in enumerate(df.columns, 1): | |
| print(f" {i}. {col}") | |
| sys.exit(1) | |
| # Rename columns | |
| df_renamed = df.select([ | |
| pl.col(original).alias(standard) if original in df.columns else pl.lit(None).alias(standard) | |
| for standard, original in column_mapping.items() | |
| ]) | |
| # Add missing columns with defaults | |
| required_columns = { | |
| 'asset_eic': pl.Utf8, | |
| 'asset_name': pl.Utf8, | |
| 'start_time': pl.Datetime, | |
| 'end_time': pl.Datetime, | |
| 'businesstype': pl.Utf8, | |
| 'from_zone': pl.Utf8, | |
| 'to_zone': pl.Utf8, | |
| 'border': pl.Utf8 | |
| } | |
| for col, dtype in required_columns.items(): | |
| if col not in df_renamed.columns: | |
| if dtype == pl.Datetime: | |
| df_renamed = df_renamed.with_columns(pl.lit(None).cast(pl.Datetime).alias(col)) | |
| else: | |
| df_renamed = df_renamed.with_columns(pl.lit(None).cast(dtype).alias(col)) | |
| # Set known values for Alegro | |
| df_renamed = df_renamed.with_columns([ | |
| pl.lit('BE').alias('from_zone'), | |
| pl.lit('DE').alias('to_zone'), | |
| pl.lit('BE_DE').alias('border') | |
| ]) | |
| # Parse timestamps if they're strings | |
| if df_renamed['start_time'].dtype == pl.Utf8: | |
| df_renamed = df_renamed.with_columns( | |
| pl.col('start_time').str.to_datetime().alias('start_time') | |
| ) | |
| if df_renamed['end_time'].dtype == pl.Utf8: | |
| df_renamed = df_renamed.with_columns( | |
| pl.col('end_time').str.to_datetime().alias('end_time') | |
| ) | |
| # Filter to only future outages (forward-looking for forecasting) | |
| now = pd.Timestamp.now(tz='UTC') | |
| df_future = df_renamed.filter(pl.col('end_time') > now) | |
| print("=" * 80) | |
| print("CONVERSION SUMMARY") | |
| print("=" * 80) | |
| print(f"Total outages in export: {len(df_renamed)}") | |
| print(f"Future outages (for forecasting): {len(df_future)}") | |
| print() | |
| # Show business type breakdown | |
| if 'businesstype' in df_renamed.columns: | |
| type_counts = df_renamed.group_by('businesstype').agg(pl.len().alias('count')) | |
| print("Business Type breakdown:") | |
| for row in type_counts.iter_rows(named=True): | |
| print(f" {row['businesstype']}: {row['count']} outages") | |
| print() | |
| # Save both full and future-only versions | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| # Save all outages | |
| df_renamed.write_parquet(output_path) | |
| print(f"[SAVED ALL] {output_path} ({len(df_renamed)} outages)") | |
| # Save future outages separately | |
| future_path = output_path.parent / output_path.name.replace('.parquet', '_future.parquet') | |
| df_future.write_parquet(future_path) | |
| print(f"[SAVED FUTURE] {future_path} ({len(df_future)} outages)") | |
| print() | |
| print("=" * 80) | |
| print("[SUCCESS] Alegro outages converted successfully!") | |
| print("=" * 80) | |
| print() | |
| print("Next steps:") | |
| print("1. Verify the data looks correct:") | |
| print(f" python -c \"import polars as pl; print(pl.read_parquet('{output_path}'))\"") | |
| print("2. Integrate into feature engineering pipeline") | |
| print() | |
| return df_renamed | |
| def main(): | |
| """Main execution.""" | |
| if len(sys.argv) < 2: | |
| print("Usage: python scripts/convert_alegro_manual_export.py <input_file>") | |
| print() | |
| print("Example:") | |
| print(" python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv") | |
| print() | |
| sys.exit(1) | |
| input_file = Path(sys.argv[1]) | |
| if not input_file.exists(): | |
| print(f"[ERROR] File not found: {input_file}") | |
| sys.exit(1) | |
| # Output path | |
| base_dir = Path.cwd() | |
| output_path = base_dir / 'data' / 'raw' / 'alegro_hvdc_outages_24month.parquet' | |
| # Convert | |
| outages = convert_alegro_export(input_file, output_path) | |
| if __name__ == '__main__': | |
| main() | |