""" Convert manually exported Alegro outages to standardized parquet format. After manually exporting from ENTSO-E web UI, run this script to convert the CSV/Excel to our standard schema. Usage: python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv Expected columns in manual export (may vary): - Asset Name / Resource Name - Asset EIC / mRID - Start Time / Unavailability Start - End Time / Unavailability End - Business Type / Type (A53=Planned, A54=Forced) - Available Capacity / Unavailable Capacity (MW) Author: Claude + Evgueni Poloukarov Date: 2025-11-09 """ import sys from pathlib import Path import polars as pl import pandas as pd def convert_alegro_export(input_file: Path, output_path: Path) -> pl.DataFrame: """ Convert manually exported Alegro outages to standard schema. Args: input_file: Path to downloaded CSV/Excel file output_path: Path to save standardized parquet Returns: Standardized outages DataFrame """ print("=" * 80) print("CONVERTING MANUAL ALEGRO OUTAGE EXPORT") print("=" * 80) print(f"\nInput: {input_file}") print() # Read file (supports both CSV and Excel) if input_file.suffix.lower() in ['.csv', '.txt']: print("Reading CSV file...") df = pl.read_csv(input_file) elif input_file.suffix.lower() in ['.xlsx', '.xls']: print("Reading Excel file...") df_pandas = pd.read_excel(input_file) df = pl.from_pandas(df_pandas) else: raise ValueError(f"Unsupported file format: {input_file.suffix}") print(f" Loaded {len(df)} rows, {len(df.columns)} columns") print(f" Columns: {df.columns}") print() # Show first few rows to help identify column names print("Sample data:") print(df.head(3)) print() # Map columns to standard schema (flexible mapping) column_mapping = {} # Find asset EIC column eic_candidates = [c for c in df.columns if any(x in c.lower() for x in ['eic', 'mrid', 'code', 'id'])] if eic_candidates: column_mapping['asset_eic'] = eic_candidates[0] print(f"Mapped asset_eic <- {eic_candidates[0]}") # Find asset name column name_candidates = [c for c in df.columns if any(x in c.lower() for x in ['name', 'resource', 'asset'])] if name_candidates: column_mapping['asset_name'] = name_candidates[0] print(f"Mapped asset_name <- {name_candidates[0]}") # Find start time column start_candidates = [c for c in df.columns if any(x in c.lower() for x in ['start', 'begin', 'from'])] if start_candidates: column_mapping['start_time'] = start_candidates[0] print(f"Mapped start_time <- {start_candidates[0]}") # Find end time column end_candidates = [c for c in df.columns if any(x in c.lower() for x in ['end', 'to', 'until'])] if end_candidates: column_mapping['end_time'] = end_candidates[0] print(f"Mapped end_time <- {end_candidates[0]}") # Find business type column type_candidates = [c for c in df.columns if any(x in c.lower() for x in ['type', 'business', 'category'])] if type_candidates: column_mapping['businesstype'] = type_candidates[0] print(f"Mapped businesstype <- {type_candidates[0]}") # Find capacity column (if available) capacity_candidates = [c for c in df.columns if any(x in c.lower() for x in ['capacity', 'mw', 'power'])] if capacity_candidates: column_mapping['capacity_mw'] = capacity_candidates[0] print(f"Mapped capacity_mw <- {capacity_candidates[0]}") print() if not column_mapping: print("[ERROR] Could not automatically map columns!") print("Please manually map columns in the script.") print() print("Available columns:") for i, col in enumerate(df.columns, 1): print(f" {i}. {col}") sys.exit(1) # Rename columns df_renamed = df.select([ pl.col(original).alias(standard) if original in df.columns else pl.lit(None).alias(standard) for standard, original in column_mapping.items() ]) # Add missing columns with defaults required_columns = { 'asset_eic': pl.Utf8, 'asset_name': pl.Utf8, 'start_time': pl.Datetime, 'end_time': pl.Datetime, 'businesstype': pl.Utf8, 'from_zone': pl.Utf8, 'to_zone': pl.Utf8, 'border': pl.Utf8 } for col, dtype in required_columns.items(): if col not in df_renamed.columns: if dtype == pl.Datetime: df_renamed = df_renamed.with_columns(pl.lit(None).cast(pl.Datetime).alias(col)) else: df_renamed = df_renamed.with_columns(pl.lit(None).cast(dtype).alias(col)) # Set known values for Alegro df_renamed = df_renamed.with_columns([ pl.lit('BE').alias('from_zone'), pl.lit('DE').alias('to_zone'), pl.lit('BE_DE').alias('border') ]) # Parse timestamps if they're strings if df_renamed['start_time'].dtype == pl.Utf8: df_renamed = df_renamed.with_columns( pl.col('start_time').str.to_datetime().alias('start_time') ) if df_renamed['end_time'].dtype == pl.Utf8: df_renamed = df_renamed.with_columns( pl.col('end_time').str.to_datetime().alias('end_time') ) # Filter to only future outages (forward-looking for forecasting) now = pd.Timestamp.now(tz='UTC') df_future = df_renamed.filter(pl.col('end_time') > now) print("=" * 80) print("CONVERSION SUMMARY") print("=" * 80) print(f"Total outages in export: {len(df_renamed)}") print(f"Future outages (for forecasting): {len(df_future)}") print() # Show business type breakdown if 'businesstype' in df_renamed.columns: type_counts = df_renamed.group_by('businesstype').agg(pl.len().alias('count')) print("Business Type breakdown:") for row in type_counts.iter_rows(named=True): print(f" {row['businesstype']}: {row['count']} outages") print() # Save both full and future-only versions output_path.parent.mkdir(parents=True, exist_ok=True) # Save all outages df_renamed.write_parquet(output_path) print(f"[SAVED ALL] {output_path} ({len(df_renamed)} outages)") # Save future outages separately future_path = output_path.parent / output_path.name.replace('.parquet', '_future.parquet') df_future.write_parquet(future_path) print(f"[SAVED FUTURE] {future_path} ({len(df_future)} outages)") print() print("=" * 80) print("[SUCCESS] Alegro outages converted successfully!") print("=" * 80) print() print("Next steps:") print("1. Verify the data looks correct:") print(f" python -c \"import polars as pl; print(pl.read_parquet('{output_path}'))\"") print("2. Integrate into feature engineering pipeline") print() return df_renamed def main(): """Main execution.""" if len(sys.argv) < 2: print("Usage: python scripts/convert_alegro_manual_export.py ") print() print("Example:") print(" python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv") print() sys.exit(1) input_file = Path(sys.argv[1]) if not input_file.exists(): print(f"[ERROR] File not found: {input_file}") sys.exit(1) # Output path base_dir = Path.cwd() output_path = base_dir / 'data' / 'raw' / 'alegro_hvdc_outages_24month.parquet' # Convert outages = convert_alegro_export(input_file, output_path) if __name__ == '__main__': main()