Spaces:
Sleeping
Sleeping
Evgueni Poloukarov
fix: ENTSO-E data quality - sub-hourly resampling + redundancy cleanup (464→296 features)
4d742bd
| """ENTSO-E Transparency Platform Data Collection with Rate Limiting | |
| Collects generation, load, and cross-border flow data from ENTSO-E API. | |
| Implements proper rate limiting to avoid temporary bans. | |
| ENTSO-E Rate Limits (OFFICIAL): | |
| - 60 requests per 60 seconds (hard limit - exceeding triggers 10-min ban) | |
| - Screen scraping >60 requests/min leads to temporary IP ban | |
| Strategy: | |
| - 27 requests/minute (45% of 60 limit - safe) | |
| - 1 request every ~2.2 seconds | |
| - Request data in monthly chunks to minimize API calls | |
| """ | |
| import polars as pl | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from dotenv import load_dotenv | |
| import os | |
| import time | |
| from typing import List, Tuple | |
| from tqdm import tqdm | |
| from entsoe import EntsoePandasClient | |
| import pandas as pd | |
| import zipfile | |
| from io import BytesIO | |
| import xml.etree.ElementTree as ET | |
| # Load environment variables | |
| load_dotenv() | |
| # FBMC Bidding Zones (12 zones from project plan) | |
| BIDDING_ZONES = { | |
| 'AT': 'Austria', | |
| 'BE': 'Belgium', | |
| 'HR': 'Croatia', | |
| 'CZ': 'Czech Republic', | |
| 'FR': 'France', | |
| 'DE_LU': 'Germany-Luxembourg', | |
| 'HU': 'Hungary', | |
| 'NL': 'Netherlands', | |
| 'PL': 'Poland', | |
| 'RO': 'Romania', | |
| 'SK': 'Slovakia', | |
| 'SI': 'Slovenia', | |
| } | |
| # FBMC Cross-Border Flows (~20 major borders) | |
| BORDERS = [ | |
| ('DE_LU', 'NL'), | |
| ('DE_LU', 'FR'), | |
| ('DE_LU', 'BE'), | |
| ('DE_LU', 'AT'), | |
| ('DE_LU', 'CZ'), | |
| ('DE_LU', 'PL'), | |
| ('FR', 'BE'), | |
| ('FR', 'ES'), # External but affects FBMC | |
| ('FR', 'CH'), # External but affects FBMC | |
| ('AT', 'CZ'), | |
| ('AT', 'HU'), | |
| ('AT', 'SI'), | |
| ('AT', 'CH'), # External but affects FBMC | |
| ('CZ', 'SK'), | |
| ('CZ', 'PL'), | |
| ('HU', 'SK'), | |
| ('HU', 'RO'), | |
| ('HU', 'HR'), | |
| ('SI', 'HR'), | |
| ('PL', 'SK'), | |
| ('PL', 'CZ'), | |
| ] | |
| # FBMC Bidding Zone EIC Codes (for asset-specific outages) | |
| BIDDING_ZONE_EICS = { | |
| 'AT': '10YAT-APG------L', | |
| 'BE': '10YBE----------2', | |
| 'HR': '10YHR-HEP------M', | |
| 'CZ': '10YCZ-CEPS-----N', | |
| 'FR': '10YFR-RTE------C', | |
| 'DE_LU': '10Y1001A1001A82H', | |
| 'HU': '10YHU-MAVIR----U', | |
| 'NL': '10YNL----------L', | |
| 'PL': '10YPL-AREA-----S', | |
| 'RO': '10YRO-TEL------P', | |
| 'SK': '10YSK-SEPS-----K', | |
| 'SI': '10YSI-ELES-----O', | |
| 'CH': '10YCH-SWISSGRIDZ', | |
| } | |
| # PSR Types for generation data collection | |
| PSR_TYPES = { | |
| 'B01': 'Biomass', | |
| 'B02': 'Fossil Brown coal/Lignite', | |
| 'B03': 'Fossil Coal-derived gas', | |
| 'B04': 'Fossil Gas', | |
| 'B05': 'Fossil Hard coal', | |
| 'B06': 'Fossil Oil', | |
| 'B07': 'Fossil Oil shale', | |
| 'B08': 'Fossil Peat', | |
| 'B09': 'Geothermal', | |
| 'B10': 'Hydro Pumped Storage', | |
| 'B11': 'Hydro Run-of-river and poundage', | |
| 'B12': 'Hydro Water Reservoir', | |
| 'B13': 'Marine', | |
| 'B14': 'Nuclear', | |
| 'B15': 'Other renewable', | |
| 'B16': 'Solar', | |
| 'B17': 'Waste', | |
| 'B18': 'Wind Offshore', | |
| 'B19': 'Wind Onshore', | |
| 'B20': 'Other', | |
| } | |
| # Zones with significant pumped storage capacity | |
| PUMPED_STORAGE_ZONES = ['CH', 'AT', 'DE_LU', 'FR', 'HU', 'PL', 'RO'] | |
| # Zones with significant hydro reservoir capacity | |
| HYDRO_RESERVOIR_ZONES = ['CH', 'AT', 'FR', 'RO', 'SI', 'HR', 'SK'] | |
| # Zones with nuclear generation | |
| NUCLEAR_ZONES = ['FR', 'BE', 'CZ', 'HU', 'RO', 'SI', 'SK'] | |
| class EntsoECollector: | |
| """Collect ENTSO-E data with proper rate limiting.""" | |
| def __init__(self, requests_per_minute: int = 27): | |
| """Initialize collector with rate limiting. | |
| Args: | |
| requests_per_minute: Max requests per minute (default: 27 = 45% of 60 limit) | |
| """ | |
| api_key = os.getenv('ENTSOE_API_KEY') | |
| if not api_key or 'your_entsoe' in api_key.lower(): | |
| raise ValueError("ENTSO-E API key not configured in .env file") | |
| self.client = EntsoePandasClient(api_key=api_key) | |
| self.requests_per_minute = requests_per_minute | |
| self.delay_seconds = 60.0 / requests_per_minute | |
| self.request_count = 0 | |
| print(f"ENTSO-E Collector initialized") | |
| print(f"Rate limit: {self.requests_per_minute} requests/minute") | |
| print(f"Delay between requests: {self.delay_seconds:.2f}s") | |
| def _rate_limit(self): | |
| """Apply rate limiting delay.""" | |
| time.sleep(self.delay_seconds) | |
| self.request_count += 1 | |
| def _generate_monthly_chunks( | |
| self, | |
| start_date: str, | |
| end_date: str | |
| ) -> List[Tuple[pd.Timestamp, pd.Timestamp]]: | |
| """Generate monthly date chunks for API requests. | |
| For most data types, ENTSO-E API supports up to 1 year per request. | |
| However, for generation outages (A77), large nuclear fleets can have | |
| hundreds of outage documents per year, exceeding the 200 element limit. | |
| Monthly chunks ensure each request stays under API pagination limits | |
| while balancing API call efficiency. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| List of (start, end) timestamp tuples (monthly periods) | |
| """ | |
| start_dt = pd.Timestamp(start_date, tz='UTC') | |
| end_dt = pd.Timestamp(end_date, tz='UTC') | |
| chunks = [] | |
| current = start_dt | |
| while current < end_dt: | |
| # Get end of month or end_date, whichever is earlier | |
| # Add 1 month then subtract 1 day to get last day of current month | |
| month_end = (current + pd.offsets.MonthEnd(1)).replace(hour=23, minute=59, second=59) | |
| chunk_end = min(month_end, end_dt) | |
| chunks.append((current, chunk_end)) | |
| # Start next chunk at beginning of next month | |
| current = chunk_end + pd.Timedelta(hours=1) | |
| return chunks | |
| def _generate_weekly_chunks( | |
| self, | |
| start_date: str, | |
| end_date: str | |
| ) -> List[Tuple[pd.Timestamp, pd.Timestamp]]: | |
| """Generate weekly date chunks for API requests. | |
| For generation outages (A77), even monthly chunks can exceed the 200 | |
| element limit for high-activity zones (France nuclear: 228-263 docs/month). | |
| Weekly chunks ensure reliable data collection: | |
| - ~30-60 documents per week (well under 200 limit) | |
| - Handles peak outage periods (spring/summer maintenance) | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| List of (start, end) timestamp tuples (weekly periods) | |
| """ | |
| start_dt = pd.Timestamp(start_date, tz='UTC') | |
| end_dt = pd.Timestamp(end_date, tz='UTC') | |
| chunks = [] | |
| current = start_dt | |
| while current < end_dt: | |
| # Get end of week (6 days from start, Sunday to Saturday) | |
| week_end = (current + pd.Timedelta(days=6)).replace(hour=23, minute=59, second=59) | |
| chunk_end = min(week_end, end_dt) | |
| chunks.append((current, chunk_end)) | |
| # Start next chunk at beginning of next week | |
| current = chunk_end + pd.Timedelta(hours=1) | |
| return chunks | |
| def collect_generation_per_type( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect generation by production type for a bidding zone. | |
| Args: | |
| zone: Bidding zone code (e.g., 'DE_LU', 'FR') | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with generation data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} generation", leave=False): | |
| try: | |
| # Fetch generation data | |
| df = self.client.query_generation( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk, | |
| psr_type=None # Get all production types | |
| ) | |
| if df is not None and not df.empty: | |
| # Convert to long format | |
| df_reset = df.reset_index() | |
| df_melted = df_reset.melt( | |
| id_vars=['index'], | |
| var_name='production_type', | |
| value_name='generation_mw' | |
| ) | |
| df_melted = df_melted.rename(columns={'index': 'timestamp'}) | |
| df_melted['zone'] = zone | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(df_melted) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" ❌ Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_load( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect load (demand) data for a bidding zone. | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with load data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} load", leave=False): | |
| try: | |
| # Fetch load data | |
| series = self.client.query_load( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| # Handle both Series and DataFrame returns | |
| if isinstance(series, pd.DataFrame): | |
| series = series.iloc[:, 0] | |
| # Convert timestamp index to UTC and remove timezone to avoid timezone mismatch on concat | |
| timestamp_index = series.index | |
| if hasattr(timestamp_index, 'tz_convert'): | |
| timestamp_index = timestamp_index.tz_convert('UTC').tz_localize(None) | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamp_index, | |
| 'load_mw': series.values, | |
| 'zone': zone | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" [ERROR] Failed {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_cross_border_flows( | |
| self, | |
| from_zone: str, | |
| to_zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect cross-border flow data between two zones. | |
| Args: | |
| from_zone: From bidding zone | |
| to_zone: To bidding zone | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with flow data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| border_id = f"{from_zone}_{to_zone}" | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {border_id}", leave=False): | |
| try: | |
| # Fetch cross-border flow | |
| series = self.client.query_crossborder_flows( | |
| from_zone, | |
| to_zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| df = pd.DataFrame({ | |
| 'timestamp': series.index, | |
| 'flow_mw': series.values, | |
| 'from_zone': from_zone, | |
| 'to_zone': to_zone, | |
| 'border': border_id | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" ❌ Failed {border_id} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_transmission_outages_asset_specific( | |
| self, | |
| cnec_eics: List[str], | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect asset-specific transmission outages using XML parsing. | |
| Uses validated Phase 1C/1D methodology: Query border-level outages, | |
| parse ZIP/XML to extract Asset_RegisteredResource.mRID elements, | |
| filter to CNEC EIC codes. | |
| Args: | |
| cnec_eics: List of CNEC EIC codes to filter (e.g., 200 critical CNECs) | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with outage events | |
| Columns: asset_eic, asset_name, start_time, end_time, | |
| businesstype, from_zone, to_zone, border | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_outages = [] | |
| # Query all FBMC borders for transmission outages | |
| for zone1, zone2 in tqdm(BORDERS, desc="Transmission outages (borders)"): | |
| zone1_eic = BIDDING_ZONE_EICS.get(zone1) | |
| zone2_eic = BIDDING_ZONE_EICS.get(zone2) | |
| if not zone1_eic or not zone2_eic: | |
| continue | |
| for start_chunk, end_chunk in chunks: | |
| try: | |
| # Query border-level outages (raw bytes) | |
| response = self.client._base_request( | |
| params={ | |
| 'documentType': 'A78', # Transmission unavailability | |
| 'in_Domain': zone2_eic, | |
| 'out_Domain': zone1_eic | |
| }, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| outages_zip = response.content | |
| # Parse ZIP and extract Asset_RegisteredResource.mRID | |
| with zipfile.ZipFile(BytesIO(outages_zip), 'r') as zf: | |
| xml_files = [f for f in zf.namelist() if f.endswith('.xml')] | |
| for xml_file in xml_files: | |
| with zf.open(xml_file) as xf: | |
| xml_content = xf.read() | |
| root = ET.fromstring(xml_content) | |
| # Get namespace | |
| nsmap = dict([node for _, node in ET.iterparse( | |
| BytesIO(xml_content), events=['start-ns'] | |
| )]) | |
| ns_uri = nsmap.get('', None) | |
| # Find TimeSeries elements | |
| if ns_uri: | |
| timeseries_found = root.findall('.//{' + ns_uri + '}TimeSeries') | |
| else: | |
| timeseries_found = root.findall('.//TimeSeries') | |
| for ts in timeseries_found: | |
| # Extract Asset_RegisteredResource.mRID | |
| if ns_uri: | |
| reg_resource = ts.find('.//{' + ns_uri + '}Asset_RegisteredResource') | |
| else: | |
| reg_resource = ts.find('.//Asset_RegisteredResource') | |
| if reg_resource is not None: | |
| # Get asset EIC | |
| if ns_uri: | |
| mrid_elem = reg_resource.find('.//{' + ns_uri + '}mRID') | |
| name_elem = reg_resource.find('.//{' + ns_uri + '}name') | |
| else: | |
| mrid_elem = reg_resource.find('.//mRID') | |
| name_elem = reg_resource.find('.//name') | |
| if mrid_elem is not None: | |
| asset_eic = mrid_elem.text | |
| # Filter to CNEC list | |
| if asset_eic in cnec_eics: | |
| asset_name = name_elem.text if name_elem is not None else '' | |
| # Extract outage periods | |
| if ns_uri: | |
| periods = ts.findall('.//{' + ns_uri + '}Available_Period') | |
| else: | |
| periods = ts.findall('.//Available_Period') | |
| for period in periods: | |
| if ns_uri: | |
| time_interval = period.find('.//{' + ns_uri + '}timeInterval') | |
| else: | |
| time_interval = period.find('.//timeInterval') | |
| if time_interval is not None: | |
| if ns_uri: | |
| start_elem = time_interval.find('.//{' + ns_uri + '}start') | |
| end_elem = time_interval.find('.//{' + ns_uri + '}end') | |
| else: | |
| start_elem = time_interval.find('.//start') | |
| end_elem = time_interval.find('.//end') | |
| if start_elem is not None and end_elem is not None: | |
| # Extract business type from root | |
| if ns_uri: | |
| business_type_elem = root.find('.//{' + ns_uri + '}businessType') | |
| else: | |
| business_type_elem = root.find('.//businessType') | |
| business_type = business_type_elem.text if business_type_elem is not None else 'Unknown' | |
| all_outages.append({ | |
| 'asset_eic': asset_eic, | |
| 'asset_name': asset_name, | |
| 'start_time': pd.Timestamp(start_elem.text), | |
| 'end_time': pd.Timestamp(end_elem.text), | |
| 'businesstype': business_type, | |
| 'from_zone': zone1, | |
| 'to_zone': zone2, | |
| 'border': f"{zone1}_{zone2}" | |
| }) | |
| self._rate_limit() | |
| except Exception as e: | |
| # Empty response or no outages is OK | |
| if "empty" not in str(e).lower(): | |
| print(f" Warning: {zone1}->{zone2} {start_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_outages: | |
| return pl.DataFrame(all_outages) | |
| else: | |
| return pl.DataFrame() | |
| def collect_day_ahead_prices( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect day-ahead electricity prices. | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with price data | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} prices", leave=False): | |
| try: | |
| series = self.client.query_day_ahead_prices( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| # Handle both Series and DataFrame returns | |
| if isinstance(series, pd.DataFrame): | |
| series = series.iloc[:, 0] | |
| # Convert timestamp index to UTC and remove timezone to avoid timezone mismatch on concat | |
| timestamp_index = series.index | |
| if hasattr(timestamp_index, 'tz_convert'): | |
| timestamp_index = timestamp_index.tz_convert('UTC').tz_localize(None) | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamp_index, | |
| 'price_eur_mwh': series.values, | |
| 'zone': zone | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" Warning: {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_hydro_reservoir_storage( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect hydro reservoir storage levels (weekly data). | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with reservoir storage data (weekly) | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} hydro storage", leave=False): | |
| try: | |
| series = self.client.query_aggregate_water_reservoirs_and_hydro_storage( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| # Handle both Series and DataFrame returns | |
| if isinstance(series, pd.DataFrame): | |
| series = series.iloc[:, 0] | |
| # Convert timestamp index to UTC and remove timezone to avoid timezone mismatch on concat | |
| timestamp_index = series.index | |
| if hasattr(timestamp_index, 'tz_convert'): | |
| timestamp_index = timestamp_index.tz_convert('UTC').tz_localize(None) | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamp_index, | |
| 'storage_mwh': series.values, | |
| 'zone': zone | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" Warning: {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_pumped_storage_generation( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect pumped storage generation (B10 PSR type). | |
| Note: Consumption data not separately available from ENTSO-E API. | |
| Returns generation-only data. | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with pumped storage generation | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} pumped storage", leave=False): | |
| try: | |
| series = self.client.query_generation( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk, | |
| psr_type='B10' # Hydro Pumped Storage | |
| ) | |
| if series is not None and not series.empty: | |
| # Handle both Series and DataFrame returns | |
| if isinstance(series, pd.DataFrame): | |
| # If multiple columns, take first | |
| series = series.iloc[:, 0] | |
| # Convert timestamp index to UTC and remove timezone to avoid timezone mismatch on concat | |
| timestamp_index = series.index | |
| if hasattr(timestamp_index, 'tz_convert'): | |
| timestamp_index = timestamp_index.tz_convert('UTC').tz_localize(None) | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamp_index, | |
| 'generation_mw': series.values, | |
| 'zone': zone | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" Warning: {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_load_forecast( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect load forecast data. | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with load forecast | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} load forecast", leave=False): | |
| try: | |
| series = self.client.query_load_forecast( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| if series is not None and not series.empty: | |
| # Handle both Series and DataFrame returns | |
| if isinstance(series, pd.DataFrame): | |
| series = series.iloc[:, 0] | |
| # Convert timestamp index to UTC and remove timezone to avoid timezone mismatch on concat | |
| timestamp_index = series.index | |
| if hasattr(timestamp_index, 'tz_convert'): | |
| timestamp_index = timestamp_index.tz_convert('UTC').tz_localize(None) | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamp_index, | |
| 'forecast_mw': series.values, | |
| 'zone': zone | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" Warning: {zone} {start_chunk.date()} to {end_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_generation_outages( | |
| self, | |
| zone: str, | |
| start_date: str, | |
| end_date: str, | |
| psr_type: str = None | |
| ) -> pl.DataFrame: | |
| """Collect generation/production unit outages. | |
| Uses document type A77 (unavailability of generation units). | |
| Particularly important for nuclear planned outages which are known | |
| months in advance and significantly impact cross-border flows. | |
| Weekly chunks are used to avoid API pagination limits (200 docs/request). | |
| France nuclear can have 228+ outage documents per month during peak periods. | |
| Deduplication: More recent reports of the same outage overwrite earlier ones. | |
| The API may return the same outage across multiple weekly queries as updates | |
| are published. We keep only the most recent version per unique outage. | |
| Args: | |
| zone: Bidding zone code | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| psr_type: Optional PSR type filter (B14=Nuclear, B04=Gas, B05=Coal, etc.) | |
| Returns: | |
| Polars DataFrame with generation unit outages | |
| Columns: unit_name, psr_type, psr_name, capacity_mw, | |
| start_time, end_time, businesstype, zone, collection_order | |
| """ | |
| chunks = self._generate_weekly_chunks(start_date, end_date) | |
| all_outages = [] | |
| collection_order = 0 # Track order for deduplication (later = more recent) | |
| zone_eic = BIDDING_ZONE_EICS.get(zone) | |
| if not zone_eic: | |
| return pl.DataFrame() | |
| psr_name = PSR_TYPES.get(psr_type, psr_type) if psr_type else 'All' | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} {psr_name} outages", leave=False): | |
| collection_order += 1 | |
| try: | |
| # Build query parameters | |
| params = { | |
| 'documentType': 'A77', # Generation unavailability | |
| 'biddingZone_Domain': zone_eic | |
| } | |
| # Add PSR type filter if specified | |
| if psr_type: | |
| params['psrType'] = psr_type | |
| # Query generation unavailability | |
| response = self.client._base_request( | |
| params=params, | |
| start=start_chunk, | |
| end=end_chunk | |
| ) | |
| outages_zip = response.content | |
| # Parse ZIP and extract outage information | |
| with zipfile.ZipFile(BytesIO(outages_zip), 'r') as zf: | |
| xml_files = [f for f in zf.namelist() if f.endswith('.xml')] | |
| for xml_file in xml_files: | |
| with zf.open(xml_file) as xf: | |
| xml_content = xf.read() | |
| root = ET.fromstring(xml_content) | |
| # Get namespace | |
| nsmap = dict([node for _, node in ET.iterparse( | |
| BytesIO(xml_content), events=['start-ns'] | |
| )]) | |
| ns_uri = nsmap.get('', None) | |
| # Find TimeSeries elements | |
| if ns_uri: | |
| timeseries_found = root.findall('.//{' + ns_uri + '}TimeSeries') | |
| else: | |
| timeseries_found = root.findall('.//TimeSeries') | |
| for ts in timeseries_found: | |
| # Extract production unit information | |
| if ns_uri: | |
| prod_unit = ts.find('.//{' + ns_uri + '}Production_RegisteredResource') | |
| else: | |
| prod_unit = ts.find('.//Production_RegisteredResource') | |
| if prod_unit is not None: | |
| # Get unit details | |
| if ns_uri: | |
| name_elem = prod_unit.find('.//{' + ns_uri + '}name') | |
| psr_elem = prod_unit.find('.//{' + ns_uri + '}psrType') | |
| else: | |
| name_elem = prod_unit.find('.//name') | |
| psr_elem = prod_unit.find('.//psrType') | |
| unit_name = name_elem.text if name_elem is not None else 'Unknown' | |
| unit_psr = psr_elem.text if psr_elem is not None else psr_type | |
| # Extract outage periods and capacity | |
| if ns_uri: | |
| periods = ts.findall('.//{' + ns_uri + '}Unavailable_Period') | |
| else: | |
| periods = ts.findall('.//Unavailable_Period') | |
| for period in periods: | |
| if ns_uri: | |
| time_interval = period.find('.//{' + ns_uri + '}timeInterval') | |
| quantity_elem = period.find('.//{' + ns_uri + '}quantity') | |
| else: | |
| time_interval = period.find('.//timeInterval') | |
| quantity_elem = period.find('.//quantity') | |
| if time_interval is not None: | |
| if ns_uri: | |
| start_elem = time_interval.find('.//{' + ns_uri + '}start') | |
| end_elem = time_interval.find('.//{' + ns_uri + '}end') | |
| else: | |
| start_elem = time_interval.find('.//start') | |
| end_elem = time_interval.find('.//end') | |
| if start_elem is not None and end_elem is not None: | |
| # Get business type | |
| if ns_uri: | |
| business_type_elem = root.find('.//{' + ns_uri + '}businessType') | |
| else: | |
| business_type_elem = root.find('.//businessType') | |
| business_type = business_type_elem.text if business_type_elem is not None else 'Unknown' | |
| # Get capacity | |
| capacity_mw = float(quantity_elem.text) if quantity_elem is not None else 0.0 | |
| all_outages.append({ | |
| 'unit_name': unit_name, | |
| 'psr_type': unit_psr, | |
| 'psr_name': PSR_TYPES.get(unit_psr, unit_psr), | |
| 'capacity_mw': capacity_mw, | |
| 'start_time': pd.Timestamp(start_elem.text), | |
| 'end_time': pd.Timestamp(end_elem.text), | |
| 'businesstype': business_type, | |
| 'zone': zone, | |
| 'collection_order': collection_order | |
| }) | |
| self._rate_limit() | |
| except Exception as e: | |
| # Empty response is OK (no outages) | |
| if "empty" not in str(e).lower(): | |
| print(f" Warning: {zone} {psr_name} {start_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_outages: | |
| df = pl.DataFrame(all_outages) | |
| # Deduplicate: Keep only most recent report of each unique outage | |
| # More recent collections (higher collection_order) overwrite earlier ones | |
| # Unique outage = same unit_name + start_time + end_time | |
| df = df.sort('collection_order', descending=True) # Most recent first | |
| df = df.unique(subset=['unit_name', 'start_time', 'end_time'], keep='first') | |
| # Remove collection_order column (no longer needed) | |
| df = df.drop('collection_order') | |
| return df | |
| else: | |
| return pl.DataFrame() | |
| def collect_generation_by_psr_type( | |
| self, | |
| zone: str, | |
| psr_type: str, | |
| start_date: str, | |
| end_date: str | |
| ) -> pl.DataFrame: | |
| """Collect generation for a specific PSR type. | |
| Args: | |
| zone: Bidding zone code | |
| psr_type: PSR type code (e.g., 'B04' for Gas, 'B14' for Nuclear) | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| Polars DataFrame with generation data for the PSR type | |
| """ | |
| chunks = self._generate_monthly_chunks(start_date, end_date) | |
| all_data = [] | |
| psr_name = PSR_TYPES.get(psr_type, psr_type) | |
| for start_chunk, end_chunk in tqdm(chunks, desc=f" {zone} {psr_name}", leave=False): | |
| try: | |
| series = self.client.query_generation( | |
| zone, | |
| start=start_chunk, | |
| end=end_chunk, | |
| psr_type=psr_type | |
| ) | |
| if series is not None and not series.empty: | |
| # Handle both Series and DataFrame returns | |
| if isinstance(series, pd.DataFrame): | |
| series = series.iloc[:, 0] | |
| # Convert timestamp index to UTC to avoid timezone mismatch on concat | |
| timestamp_index = series.index | |
| if hasattr(timestamp_index, 'tz_convert'): | |
| timestamp_index = timestamp_index.tz_convert('UTC') | |
| df = pd.DataFrame({ | |
| 'timestamp': timestamp_index, | |
| 'generation_mw': series.values, | |
| 'zone': zone, | |
| 'psr_type': psr_type, | |
| 'psr_name': psr_name | |
| }) | |
| pl_df = pl.from_pandas(df) | |
| all_data.append(pl_df) | |
| self._rate_limit() | |
| except Exception as e: | |
| print(f" Warning: {zone} {psr_name} {start_chunk.date()}: {e}") | |
| self._rate_limit() | |
| continue | |
| if all_data: | |
| return pl.concat(all_data) | |
| else: | |
| return pl.DataFrame() | |
| def collect_all( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_dir: Path | |
| ) -> dict: | |
| """Collect all ENTSO-E data with rate limiting. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_dir: Directory to save Parquet files | |
| Returns: | |
| Dictionary with paths to saved files | |
| """ | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| # Calculate total requests | |
| months = len(self._generate_monthly_chunks(start_date, end_date)) | |
| total_requests = ( | |
| len(BIDDING_ZONES) * months * 2 + # Generation + load | |
| len(BORDERS) * months # Flows | |
| ) | |
| estimated_minutes = total_requests / self.requests_per_minute | |
| print("=" * 70) | |
| print("ENTSO-E Data Collection") | |
| print("=" * 70) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Bidding zones: {len(BIDDING_ZONES)}") | |
| print(f"Cross-border flows: {len(BORDERS)}") | |
| print(f"Monthly chunks: {months}") | |
| print(f"Total requests: ~{total_requests}") | |
| print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 60 max)") | |
| print(f"Estimated time: {estimated_minutes:.1f} minutes") | |
| print() | |
| results = {} | |
| # 1. Collect Generation Data | |
| print("[1/3] Collecting generation data by production type...") | |
| generation_data = [] | |
| for zone in tqdm(BIDDING_ZONES.keys(), desc="Generation"): | |
| df = self.collect_generation_per_type(zone, start_date, end_date) | |
| if not df.is_empty(): | |
| generation_data.append(df) | |
| if generation_data: | |
| generation_df = pl.concat(generation_data) | |
| gen_path = output_dir / "entsoe_generation_2024_2025.parquet" | |
| generation_df.write_parquet(gen_path) | |
| results['generation'] = gen_path | |
| print(f"✅ Generation: {generation_df.shape[0]:,} records → {gen_path}") | |
| # 2. Collect Load Data | |
| print("\n[2/3] Collecting load (demand) data...") | |
| load_data = [] | |
| for zone in tqdm(BIDDING_ZONES.keys(), desc="Load"): | |
| df = self.collect_load(zone, start_date, end_date) | |
| if not df.is_empty(): | |
| load_data.append(df) | |
| if load_data: | |
| load_df = pl.concat(load_data) | |
| load_path = output_dir / "entsoe_load_2024_2025.parquet" | |
| load_df.write_parquet(load_path) | |
| results['load'] = load_path | |
| print(f"✅ Load: {load_df.shape[0]:,} records → {load_path}") | |
| # 3. Collect Cross-Border Flows | |
| print("\n[3/3] Collecting cross-border flows...") | |
| flow_data = [] | |
| for from_zone, to_zone in tqdm(BORDERS, desc="Flows"): | |
| df = self.collect_cross_border_flows(from_zone, to_zone, start_date, end_date) | |
| if not df.is_empty(): | |
| flow_data.append(df) | |
| if flow_data: | |
| flow_df = pl.concat(flow_data) | |
| flow_path = output_dir / "entsoe_flows_2024_2025.parquet" | |
| flow_df.write_parquet(flow_path) | |
| results['flows'] = flow_path | |
| print(f"✅ Flows: {flow_df.shape[0]:,} records → {flow_path}") | |
| print() | |
| print("=" * 70) | |
| print("ENTSO-E Collection Complete") | |
| print("=" * 70) | |
| print(f"Total API requests made: {self.request_count}") | |
| print(f"Files created: {len(results)}") | |
| for data_type, path in results.items(): | |
| file_size = path.stat().st_size / (1024**2) | |
| print(f" - {data_type}: {file_size:.1f} MB") | |
| return results | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Collect ENTSO-E data with proper rate limiting") | |
| parser.add_argument( | |
| '--start-date', | |
| default='2024-10-01', | |
| help='Start date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--end-date', | |
| default='2025-09-30', | |
| help='End date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--output-dir', | |
| type=Path, | |
| default=Path('data/raw'), | |
| help='Output directory for Parquet files' | |
| ) | |
| parser.add_argument( | |
| '--requests-per-minute', | |
| type=int, | |
| default=27, | |
| help='Requests per minute (default: 27 = 45%% of 60 limit)' | |
| ) | |
| args = parser.parse_args() | |
| # Initialize collector and run | |
| collector = EntsoECollector(requests_per_minute=args.requests_per_minute) | |
| collector.collect_all( | |
| start_date=args.start_date, | |
| end_date=args.end_date, | |
| output_dir=args.output_dir | |
| ) | |