fbmc-chronos2 / scripts /convert_alegro_manual_export.py
Evgueni Poloukarov
feat: Phase 1 complete - Master CNEC list + synchronized feature engineering
d4939ce
"""
Convert manually exported Alegro outages to standardized parquet format.
After manually exporting from ENTSO-E web UI, run this script to convert
the CSV/Excel to our standard schema.
Usage:
python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv
Expected columns in manual export (may vary):
- Asset Name / Resource Name
- Asset EIC / mRID
- Start Time / Unavailability Start
- End Time / Unavailability End
- Business Type / Type (A53=Planned, A54=Forced)
- Available Capacity / Unavailable Capacity (MW)
Author: Claude + Evgueni Poloukarov
Date: 2025-11-09
"""
import sys
from pathlib import Path
import polars as pl
import pandas as pd
def convert_alegro_export(input_file: Path, output_path: Path) -> pl.DataFrame:
"""
Convert manually exported Alegro outages to standard schema.
Args:
input_file: Path to downloaded CSV/Excel file
output_path: Path to save standardized parquet
Returns:
Standardized outages DataFrame
"""
print("=" * 80)
print("CONVERTING MANUAL ALEGRO OUTAGE EXPORT")
print("=" * 80)
print(f"\nInput: {input_file}")
print()
# Read file (supports both CSV and Excel)
if input_file.suffix.lower() in ['.csv', '.txt']:
print("Reading CSV file...")
df = pl.read_csv(input_file)
elif input_file.suffix.lower() in ['.xlsx', '.xls']:
print("Reading Excel file...")
df_pandas = pd.read_excel(input_file)
df = pl.from_pandas(df_pandas)
else:
raise ValueError(f"Unsupported file format: {input_file.suffix}")
print(f" Loaded {len(df)} rows, {len(df.columns)} columns")
print(f" Columns: {df.columns}")
print()
# Show first few rows to help identify column names
print("Sample data:")
print(df.head(3))
print()
# Map columns to standard schema (flexible mapping)
column_mapping = {}
# Find asset EIC column
eic_candidates = [c for c in df.columns if any(x in c.lower() for x in ['eic', 'mrid', 'code', 'id'])]
if eic_candidates:
column_mapping['asset_eic'] = eic_candidates[0]
print(f"Mapped asset_eic <- {eic_candidates[0]}")
# Find asset name column
name_candidates = [c for c in df.columns if any(x in c.lower() for x in ['name', 'resource', 'asset'])]
if name_candidates:
column_mapping['asset_name'] = name_candidates[0]
print(f"Mapped asset_name <- {name_candidates[0]}")
# Find start time column
start_candidates = [c for c in df.columns if any(x in c.lower() for x in ['start', 'begin', 'from'])]
if start_candidates:
column_mapping['start_time'] = start_candidates[0]
print(f"Mapped start_time <- {start_candidates[0]}")
# Find end time column
end_candidates = [c for c in df.columns if any(x in c.lower() for x in ['end', 'to', 'until'])]
if end_candidates:
column_mapping['end_time'] = end_candidates[0]
print(f"Mapped end_time <- {end_candidates[0]}")
# Find business type column
type_candidates = [c for c in df.columns if any(x in c.lower() for x in ['type', 'business', 'category'])]
if type_candidates:
column_mapping['businesstype'] = type_candidates[0]
print(f"Mapped businesstype <- {type_candidates[0]}")
# Find capacity column (if available)
capacity_candidates = [c for c in df.columns if any(x in c.lower() for x in ['capacity', 'mw', 'power'])]
if capacity_candidates:
column_mapping['capacity_mw'] = capacity_candidates[0]
print(f"Mapped capacity_mw <- {capacity_candidates[0]}")
print()
if not column_mapping:
print("[ERROR] Could not automatically map columns!")
print("Please manually map columns in the script.")
print()
print("Available columns:")
for i, col in enumerate(df.columns, 1):
print(f" {i}. {col}")
sys.exit(1)
# Rename columns
df_renamed = df.select([
pl.col(original).alias(standard) if original in df.columns else pl.lit(None).alias(standard)
for standard, original in column_mapping.items()
])
# Add missing columns with defaults
required_columns = {
'asset_eic': pl.Utf8,
'asset_name': pl.Utf8,
'start_time': pl.Datetime,
'end_time': pl.Datetime,
'businesstype': pl.Utf8,
'from_zone': pl.Utf8,
'to_zone': pl.Utf8,
'border': pl.Utf8
}
for col, dtype in required_columns.items():
if col not in df_renamed.columns:
if dtype == pl.Datetime:
df_renamed = df_renamed.with_columns(pl.lit(None).cast(pl.Datetime).alias(col))
else:
df_renamed = df_renamed.with_columns(pl.lit(None).cast(dtype).alias(col))
# Set known values for Alegro
df_renamed = df_renamed.with_columns([
pl.lit('BE').alias('from_zone'),
pl.lit('DE').alias('to_zone'),
pl.lit('BE_DE').alias('border')
])
# Parse timestamps if they're strings
if df_renamed['start_time'].dtype == pl.Utf8:
df_renamed = df_renamed.with_columns(
pl.col('start_time').str.to_datetime().alias('start_time')
)
if df_renamed['end_time'].dtype == pl.Utf8:
df_renamed = df_renamed.with_columns(
pl.col('end_time').str.to_datetime().alias('end_time')
)
# Filter to only future outages (forward-looking for forecasting)
now = pd.Timestamp.now(tz='UTC')
df_future = df_renamed.filter(pl.col('end_time') > now)
print("=" * 80)
print("CONVERSION SUMMARY")
print("=" * 80)
print(f"Total outages in export: {len(df_renamed)}")
print(f"Future outages (for forecasting): {len(df_future)}")
print()
# Show business type breakdown
if 'businesstype' in df_renamed.columns:
type_counts = df_renamed.group_by('businesstype').agg(pl.len().alias('count'))
print("Business Type breakdown:")
for row in type_counts.iter_rows(named=True):
print(f" {row['businesstype']}: {row['count']} outages")
print()
# Save both full and future-only versions
output_path.parent.mkdir(parents=True, exist_ok=True)
# Save all outages
df_renamed.write_parquet(output_path)
print(f"[SAVED ALL] {output_path} ({len(df_renamed)} outages)")
# Save future outages separately
future_path = output_path.parent / output_path.name.replace('.parquet', '_future.parquet')
df_future.write_parquet(future_path)
print(f"[SAVED FUTURE] {future_path} ({len(df_future)} outages)")
print()
print("=" * 80)
print("[SUCCESS] Alegro outages converted successfully!")
print("=" * 80)
print()
print("Next steps:")
print("1. Verify the data looks correct:")
print(f" python -c \"import polars as pl; print(pl.read_parquet('{output_path}'))\"")
print("2. Integrate into feature engineering pipeline")
print()
return df_renamed
def main():
"""Main execution."""
if len(sys.argv) < 2:
print("Usage: python scripts/convert_alegro_manual_export.py <input_file>")
print()
print("Example:")
print(" python scripts/convert_alegro_manual_export.py data/raw/alegro_manual_export.csv")
print()
sys.exit(1)
input_file = Path(sys.argv[1])
if not input_file.exists():
print(f"[ERROR] File not found: {input_file}")
sys.exit(1)
# Output path
base_dir = Path.cwd()
output_path = base_dir / 'data' / 'raw' / 'alegro_hvdc_outages_24month.parquet'
# Convert
outages = convert_alegro_export(input_file, output_path)
if __name__ == '__main__':
main()