""" Collect Complete 24-Month ENTSO-E Dataset ========================================== Collects all ENTSO-E data for FBMC forecasting: - Generation by PSR type (8 types × 12 zones) - Demand (12 zones) - Day-ahead prices (12 zones) - Hydro reservoir storage (7 zones) - Pumped storage generation (7 zones) - Load forecasts (12 zones) - Asset-specific transmission outages (200 CNECs) - Generation outages by technology (5 types × 7 priority zones) Period: October 2023 - September 2025 (24 months) Estimated time: 4-6 hours with rate limiting (27 req/min) """ import sys from pathlib import Path import polars as pl from datetime import datetime # Add src to path sys.path.append(str(Path(__file__).parent.parent)) from src.data_collection.collect_entsoe import EntsoECollector, BIDDING_ZONES, PUMPED_STORAGE_ZONES, HYDRO_RESERVOIR_ZONES print("="*80) print("COMPLETE 24-MONTH ENTSO-E DATA COLLECTION") print("="*80) print() print("Period: October 2023 - September 2025") print("Target features: ~246-351 ENTSO-E features (including generation outages)") print() # Initialize collector (OPTIMIZED: 55 req/min = 92% of 60 limit, yearly chunks) collector = EntsoECollector(requests_per_minute=55) # Output directory output_dir = Path(__file__).parent.parent / 'data' / 'raw' output_dir.mkdir(parents=True, exist_ok=True) # Collection parameters START_DATE = '2023-10-01' END_DATE = '2025-09-30' # Key PSR types for generation (8 most important) KEY_PSR_TYPES = { 'B14': 'Nuclear', 'B04': 'Fossil Gas', 'B05': 'Fossil Hard coal', 'B06': 'Fossil Oil', 'B19': 'Wind Onshore', 'B16': 'Solar', 'B11': 'Hydro Run-of-river', 'B12': 'Hydro Water Reservoir' } results = {} # ============================================================================ # 1. Generation by PSR Type (8 types × 12 zones = 96 features) # ============================================================================ print("-"*80) print("[1/8] GENERATION BY PSR TYPE") print("-"*80) print() # Check if generation data already exists gen_path = output_dir / "entsoe_generation_by_psr_24month.parquet" if gen_path.exists(): print(f"[SKIP] Generation data already exists at {gen_path}") print(f" File size: {gen_path.stat().st_size / (1024**2):.1f} MB") results['generation'] = gen_path else: print(f"Collecting 8 PSR types for 12 FBMC zones...") print(f"PSR types: {', '.join(KEY_PSR_TYPES.values())}") print() generation_data = [] total_queries = len(BIDDING_ZONES) * len(KEY_PSR_TYPES) completed = 0 start_time = datetime.now() for zone in BIDDING_ZONES.keys(): for psr_code, psr_name in KEY_PSR_TYPES.items(): completed += 1 print(f"[{completed}/{total_queries}] {zone} - {psr_name}...") try: df = collector.collect_generation_by_psr_type( zone=zone, psr_type=psr_code, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): generation_data.append(df) print(f" [OK] {len(df):,} records") else: print(f" - No data") except Exception as e: print(f" [ERROR] {e}") if generation_data: generation_df = pl.concat(generation_data) generation_df.write_parquet(gen_path) results['generation'] = gen_path print() print(f"[SUCCESS] Generation: {len(generation_df):,} records -> {gen_path}") print(f" File size: {gen_path.stat().st_size / (1024**2):.1f} MB") print() # ============================================================================ # 2. Demand / Load (12 zones = 12 features) # ============================================================================ print("-"*80) print("[2/8] DEMAND / LOAD") print("-"*80) print() # Check if demand data already exists load_path = output_dir / "entsoe_demand_24month.parquet" if load_path.exists(): print(f"[SKIP] Demand data already exists at {load_path}") print(f" File size: {load_path.stat().st_size / (1024**2):.1f} MB") results['demand'] = load_path else: load_data = [] for i, zone in enumerate(BIDDING_ZONES.keys(), 1): print(f"[{i}/{len(BIDDING_ZONES)}] {zone} demand...") try: df = collector.collect_load( zone=zone, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): load_data.append(df) print(f" [OK] {len(df):,} records") else: print(f" - No data") except Exception as e: print(f" [ERROR] {e}") if load_data: load_df = pl.concat(load_data) load_df.write_parquet(load_path) results['demand'] = load_path print() print(f"[SUCCESS] Demand: {len(load_df):,} records -> {load_path}") print(f" File size: {load_path.stat().st_size / (1024**2):.1f} MB") print() # ============================================================================ # 3. Day-Ahead Prices (12 zones = 12 features) # ============================================================================ print("-"*80) print("[3/8] DAY-AHEAD PRICES") print("-"*80) print() prices_data = [] for i, zone in enumerate(BIDDING_ZONES.keys(), 1): print(f"[{i}/{len(BIDDING_ZONES)}] {zone} prices...") try: df = collector.collect_day_ahead_prices( zone=zone, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): prices_data.append(df) print(f" [OK] {len(df):,} records") else: print(f" - No data") except Exception as e: print(f" [ERROR] {e}") if prices_data: prices_df = pl.concat(prices_data) prices_path = output_dir / "entsoe_prices_24month.parquet" prices_df.write_parquet(prices_path) results['prices'] = prices_path print() print(f"[SUCCESS] Prices: {len(prices_df):,} records -> {prices_path}") print(f" File size: {prices_path.stat().st_size / (1024**2):.1f} MB") print() # ============================================================================ # 4. Hydro Reservoir Storage (7 zones = 7 features) # ============================================================================ print("-"*80) print("[4/8] HYDRO RESERVOIR STORAGE") print("-"*80) print() print(f"Collecting for {len(HYDRO_RESERVOIR_ZONES)} zones with significant hydro capacity...") print() hydro_data = [] for i, zone in enumerate(HYDRO_RESERVOIR_ZONES, 1): print(f"[{i}/{len(HYDRO_RESERVOIR_ZONES)}] {zone} hydro storage...") try: df = collector.collect_hydro_reservoir_storage( zone=zone, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): hydro_data.append(df) print(f" [OK] {len(df):,} records (weekly)") else: print(f" - No data") except Exception as e: print(f" [ERROR] {e}") if hydro_data: hydro_df = pl.concat(hydro_data) hydro_path = output_dir / "entsoe_hydro_storage_24month.parquet" hydro_df.write_parquet(hydro_path) results['hydro_storage'] = hydro_path print() print(f"[SUCCESS] Hydro Storage: {len(hydro_df):,} records (weekly) -> {hydro_path}") print(f" File size: {hydro_path.stat().st_size / (1024**2):.1f} MB") print(f" Note: Will be interpolated to hourly in processing step") print() # ============================================================================ # 5. Pumped Storage Generation (7 zones = 7 features) # ============================================================================ print("-"*80) print("[5/8] PUMPED STORAGE GENERATION") print("-"*80) print() print(f"Collecting for {len(PUMPED_STORAGE_ZONES)} zones...") print("Note: Consumption data not available from ENTSO-E API (Phase 1 finding)") print() pumped_data = [] for i, zone in enumerate(PUMPED_STORAGE_ZONES, 1): print(f"[{i}/{len(PUMPED_STORAGE_ZONES)}] {zone} pumped storage...") try: df = collector.collect_pumped_storage_generation( zone=zone, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): pumped_data.append(df) print(f" [OK] {len(df):,} records") else: print(f" - No data") except Exception as e: print(f" [ERROR] {e}") if pumped_data: pumped_df = pl.concat(pumped_data) pumped_path = output_dir / "entsoe_pumped_storage_24month.parquet" pumped_df.write_parquet(pumped_path) results['pumped_storage'] = pumped_path print() print(f"[SUCCESS] Pumped Storage: {len(pumped_df):,} records -> {pumped_path}") print(f" File size: {pumped_path.stat().st_size / (1024**2):.1f} MB") print() # ============================================================================ # 6. Load Forecasts (12 zones = 12 features) # ============================================================================ print("-"*80) print("[6/8] LOAD FORECASTS") print("-"*80) print() forecast_data = [] for i, zone in enumerate(BIDDING_ZONES.keys(), 1): print(f"[{i}/{len(BIDDING_ZONES)}] {zone} load forecast...") try: df = collector.collect_load_forecast( zone=zone, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): forecast_data.append(df) print(f" [OK] {len(df):,} records") else: print(f" - No data") except Exception as e: print(f" [ERROR] {e}") if forecast_data: forecast_df = pl.concat(forecast_data) forecast_path = output_dir / "entsoe_load_forecast_24month.parquet" forecast_df.write_parquet(forecast_path) results['load_forecast'] = forecast_path print() print(f"[SUCCESS] Load Forecast: {len(forecast_df):,} records -> {forecast_path}") print(f" File size: {forecast_path.stat().st_size / (1024**2):.1f} MB") print() # ============================================================================ # 7. Asset-Specific Transmission Outages (200 CNECs = 80-165 features expected) # ============================================================================ print("-"*80) print("[7/8] ASSET-SPECIFIC TRANSMISSION OUTAGES") print("-"*80) print() print("Loading 200 CNEC EIC codes...") try: cnec_file = Path(__file__).parent.parent / 'data' / 'processed' / 'critical_cnecs_all.csv' cnec_df = pl.read_csv(cnec_file) cnec_eics = cnec_df.select('cnec_eic').to_series().to_list() print(f"[OK] Loaded {len(cnec_eics)} CNEC EICs") print() print("Collecting asset-specific transmission outages...") print("Using Phase 1 validated XML parsing method") print("Querying all 22 FBMC borders...") print() outages_df = collector.collect_transmission_outages_asset_specific( cnec_eics=cnec_eics, start_date=START_DATE, end_date=END_DATE ) if not outages_df.is_empty(): outages_path = output_dir / "entsoe_transmission_outages_24month.parquet" outages_df.write_parquet(outages_path) results['transmission_outages'] = outages_path unique_cnecs = outages_df.select('asset_eic').n_unique() coverage_pct = unique_cnecs / len(cnec_eics) * 100 print() print(f"[SUCCESS] Transmission Outages: {len(outages_df):,} records -> {outages_path}") print(f" File size: {outages_path.stat().st_size / (1024**2):.1f} MB") print(f" Unique CNECs matched: {unique_cnecs} / {len(cnec_eics)} ({coverage_pct:.1f}%)") # Show border summary border_summary = outages_df.group_by('border').agg( pl.len().alias('outage_count') ).sort('outage_count', descending=True) print() print(" Outages by border (top 10):") for row in border_summary.head(10).iter_rows(named=True): print(f" {row['border']}: {row['outage_count']:,} outages") else: print() print(" Warning: No CNEC-matched outages found") except Exception as e: print(f"[ERROR] collecting transmission outages: {e}") print() # ============================================================================ # 8. Generation Outages by Technology (5 types × 7 priority zones = 20-30 features) # ============================================================================ print("-"*80) print("[8/8] GENERATION OUTAGES BY TECHNOLOGY") print("-"*80) print() print("Collecting generation unit outages for priority zones with nuclear/fossil capacity...") print() # Priority zones with significant nuclear or fossil generation NUCLEAR_ZONES = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK'] # Technology types (PSR) prioritized by impact on cross-border flows OUTAGE_PSR_TYPES = { 'B14': 'Nuclear', # Highest priority - large capacity, planned months ahead 'B04': 'Fossil_Gas', # Flexible generation affecting flow patterns 'B05': 'Fossil_Hard_coal', 'B02': 'Fossil_Brown_coal_Lignite', 'B06': 'Fossil_Oil' } gen_outages_data = [] total_combos = len(NUCLEAR_ZONES) * len(OUTAGE_PSR_TYPES) combo_count = 0 for zone in NUCLEAR_ZONES: for psr_code, psr_name in OUTAGE_PSR_TYPES.items(): combo_count += 1 print(f"[{combo_count}/{total_combos}] {zone} - {psr_name}...") try: df = collector.collect_generation_outages( zone=zone, psr_type=psr_code, start_date=START_DATE, end_date=END_DATE ) if not df.is_empty(): gen_outages_data.append(df) total_capacity = df.select('capacity_mw').sum().item() print(f" [OK] {len(df):,} outages ({total_capacity:,.0f} MW affected)") else: print(f" - No outages") except Exception as e: print(f" [ERROR] {e}") if gen_outages_data: gen_outages_df = pl.concat(gen_outages_data) gen_outages_path = output_dir / "entsoe_generation_outages_24month.parquet" gen_outages_df.write_parquet(gen_outages_path) results['generation_outages'] = gen_outages_path unique_combos = gen_outages_df.select( (pl.col('zone') + "_" + pl.col('psr_name')).alias('zone_tech') ).n_unique() print() print(f"[SUCCESS] Generation Outages: {len(gen_outages_df):,} records -> {gen_outages_path}") print(f" File size: {gen_outages_path.stat().st_size / (1024**2):.1f} MB") print(f" Unique zone-technology combinations: {unique_combos}") print(f" Features expected: {unique_combos * 2} (binary + MW for each)") # Show technology summary tech_summary = gen_outages_df.group_by('psr_name').agg([ pl.len().alias('outage_count'), pl.col('capacity_mw').sum().alias('total_capacity_mw') ]).sort('total_capacity_mw', descending=True) print() print(" Outages by technology:") for row in tech_summary.iter_rows(named=True): print(f" {row['psr_name']}: {row['outage_count']:,} outages, {row['total_capacity_mw']:,.0f} MW") else: print() print(" Warning: No generation outages found") print(" This may be normal if no outages occurred in 24-month period") print() # ============================================================================ # SUMMARY # ============================================================================ end_time = datetime.now() total_time = end_time - start_time print("="*80) print("24-MONTH ENTSO-E COLLECTION COMPLETE") print("="*80) print() print(f"Total time: {total_time}") print(f"Files created: {len(results)}") print() total_size = 0 for data_type, path in results.items(): file_size = path.stat().st_size / (1024**2) total_size += file_size print(f" {data_type}: {file_size:.1f} MB") print() print(f"Total data size: {total_size:.1f} MB") print() print("Output directory: data/raw/") print() print("Next steps:") print(" 1. Run process_entsoe_features.py to:") print(" - Encode transmission outages to hourly binary") print(" - Encode generation outages to hourly (binary + MW)") print(" - Interpolate hydro weekly storage to hourly") print(" 2. Merge all ENTSO-E features into single matrix") print(" 3. Combine with JAO features (726) -> ~972-1,077 total features") print() print("="*80)