Spaces:
Sleeping
Sleeping
| """JAO FBMC Data Collection using jao-py Python Library | |
| Collects FBMC (Flow-Based Market Coupling) data from JAO Publication Tool. | |
| Uses the jao-py Python package for API access. | |
| Data Available from JaoPublicationToolPandasClient: | |
| - Core FBMC Day-Ahead: From June 9, 2022 onwards | |
| Discovered Methods (17 total): | |
| 1. query_maxbex(day) - Maximum Bilateral Exchange (TARGET VARIABLE) | |
| 2. query_active_constraints(day) - Active CNECs with shadow prices/RAM | |
| 3. query_final_domain(mtu) - Final flowbased domain (PTDFs) | |
| 4. query_lta(d_from, d_to) - Long Term Allocations (LTN) | |
| 5. query_minmax_np(day) - Min/Max Net Positions | |
| 6. query_net_position(day) - Actual net positions | |
| 7. query_scheduled_exchange(d_from, d_to) - Scheduled exchanges | |
| 8. query_monitoring(day) - Monitoring data (may contain RAM/shadow prices) | |
| 9. query_allocationconstraint(d_from, d_to) - Allocation constraints | |
| 10. query_alpha_factor(d_from, d_to) - Alpha factors | |
| 11. query_d2cf(d_from, d_to) - Day-2 Cross Flow | |
| 12. query_initial_domain(mtu) - Initial domain | |
| 13. query_prefinal_domain(mtu) - Pre-final domain | |
| 14. query_price_spread(d_from, d_to) - Price spreads | |
| 15. query_refprog(d_from, d_to) - Reference program | |
| 16. query_status(d_from, d_to) - Status information | |
| 17. query_validations(d_from, d_to) - Validation data | |
| Documentation: https://github.com/fboerman/jao-py | |
| """ | |
| import polars as pl | |
| from pathlib import Path | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List | |
| from tqdm import tqdm | |
| import pandas as pd | |
| try: | |
| from jao import JaoPublicationToolPandasClient | |
| except ImportError: | |
| raise ImportError( | |
| "jao-py not installed. Install with: uv pip install jao-py" | |
| ) | |
| class JAOCollector: | |
| """Collect FBMC data using jao-py Python library.""" | |
| def __init__(self): | |
| """Initialize JAO collector. | |
| Note: JaoPublicationToolPandasClient() takes no init parameters. | |
| """ | |
| self.client = JaoPublicationToolPandasClient() | |
| print("JAO Publication Tool Client initialized") | |
| print("Data available: Core FBMC from 2022-06-09 onwards") | |
| def _generate_date_range( | |
| self, | |
| start_date: str, | |
| end_date: str | |
| ) -> List[datetime]: | |
| """Generate list of business dates for data collection. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| Returns: | |
| List of datetime objects | |
| """ | |
| start_dt = datetime.fromisoformat(start_date) | |
| end_dt = datetime.fromisoformat(end_date) | |
| dates = [] | |
| current = start_dt | |
| while current <= end_dt: | |
| dates.append(current) | |
| current += timedelta(days=1) | |
| return dates | |
| def collect_maxbex_sample( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect MaxBEX (Maximum Bilateral Exchange) data - TARGET VARIABLE. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Polars DataFrame with MaxBEX data | |
| """ | |
| import time | |
| print("=" * 70) | |
| print("JAO MaxBEX Data Collection (TARGET VARIABLE)") | |
| print("=" * 70) | |
| dates = self._generate_date_range(start_date, end_date) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Total dates: {len(dates)}") | |
| print() | |
| all_data = [] | |
| for date in tqdm(dates, desc="Collecting MaxBEX"): | |
| try: | |
| # Convert to pandas Timestamp with UTC timezone (required by jao-py) | |
| pd_date = pd.Timestamp(date, tz='UTC') | |
| # Query MaxBEX data | |
| df = self.client.query_maxbex(pd_date) | |
| if df is not None and not df.empty: | |
| all_data.append(df) | |
| # Rate limiting: 5 seconds between requests | |
| time.sleep(5) | |
| except Exception as e: | |
| print(f" Failed for {date.date()}: {e}") | |
| continue | |
| if all_data: | |
| # Combine all dataframes | |
| combined_df = pd.concat(all_data, ignore_index=False) | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(combined_df) | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| pl_df.write_parquet(output_path) | |
| print() | |
| print("=" * 70) | |
| print("MaxBEX Collection Complete") | |
| print("=" * 70) | |
| print(f"Total records: {pl_df.shape[0]:,}") | |
| print(f"Columns: {pl_df.shape[1]}") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB") | |
| return pl_df | |
| else: | |
| print("No MaxBEX data collected") | |
| return None | |
| def collect_cnec_ptdf_sample( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect Active Constraints (CNECs + PTDFs in ONE call). | |
| Column Selection Strategy: | |
| - KEEP (25-26 columns): | |
| * Identifiers: tso, cnec_name, cnec_eic, direction, cont_name | |
| * Primary features: fmax, ram, shadow_price | |
| * PTDFs: ptdf_AT, ptdf_BE, ptdf_CZ, ptdf_DE, ptdf_FR, ptdf_HR, | |
| ptdf_HU, ptdf_NL, ptdf_PL, ptdf_RO, ptdf_SI, ptdf_SK | |
| * Additional features: fuaf, frm, ram_mcp, f0core, imax | |
| * Metadata: collection_date | |
| - DISCARD (14-17 columns): | |
| * Redundant: hubFrom, hubTo (derive during feature engineering) | |
| * Redundant with fuaf: f0all (r≈0.99) | |
| * Intermediate: amr, cva, iva, min_ram_factor, max_z2_z_ptdf | |
| * Empty/separate source: lta_margin (100% zero, get from LTA dataset) | |
| * Too granular: ftotal_ltn, branch_eic, fref | |
| * Non-Core FBMC: ptdf_ALBE, ptdf_ALDE | |
| Data Transformations: | |
| - Shadow prices: Log transform log(price + 1), round to 2 decimals | |
| - RAM: Clip to [0, fmax] range | |
| - PTDFs: Clip to [-1.5, +1.5] range | |
| - All floats: Round to 2 decimals (storage optimization) | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Polars DataFrame with CNEC and PTDF data | |
| """ | |
| import time | |
| import numpy as np | |
| print("=" * 70) | |
| print("JAO Active Constraints Collection (CNECs + PTDFs)") | |
| print("=" * 70) | |
| dates = self._generate_date_range(start_date, end_date) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Total dates: {len(dates)}") | |
| print() | |
| all_data = [] | |
| for date in tqdm(dates, desc="Collecting CNECs/PTDFs"): | |
| try: | |
| # Convert to pandas Timestamp with UTC timezone (required by jao-py) | |
| pd_date = pd.Timestamp(date, tz='UTC') | |
| # Query active constraints (includes CNECs + PTDFs!) | |
| df = self.client.query_active_constraints(pd_date) | |
| if df is not None and not df.empty: | |
| # Add date column for reference | |
| df['collection_date'] = date | |
| all_data.append(df) | |
| # Rate limiting: 5 seconds between requests | |
| time.sleep(5) | |
| except Exception as e: | |
| print(f" Failed for {date.date()}: {e}") | |
| continue | |
| if all_data: | |
| # Combine all dataframes | |
| combined_df = pd.concat(all_data, ignore_index=True) | |
| # Convert to Polars for efficient column operations | |
| pl_df = pl.from_pandas(combined_df) | |
| # --- DATA CLEANING & TRANSFORMATIONS --- | |
| # 1. Shadow Price: Log transform + round (NO clipping) | |
| if 'shadow_price' in pl_df.columns: | |
| pl_df = pl_df.with_columns([ | |
| # Keep original rounded to 2 decimals | |
| pl.col('shadow_price').round(2).alias('shadow_price'), | |
| # Add log-transformed version | |
| (pl.col('shadow_price') + 1).log().round(4).alias('shadow_price_log') | |
| ]) | |
| print(" [OK] Shadow price: log transform applied (no clipping)") | |
| # 2. RAM: Clip to [0, fmax] and round | |
| if 'ram' in pl_df.columns and 'fmax' in pl_df.columns: | |
| pl_df = pl_df.with_columns([ | |
| pl.when(pl.col('ram') < 0) | |
| .then(0) | |
| .when(pl.col('ram') > pl.col('fmax')) | |
| .then(pl.col('fmax')) | |
| .otherwise(pl.col('ram')) | |
| .round(2) | |
| .alias('ram') | |
| ]) | |
| print(" [OK] RAM: clipped to [0, fmax] range") | |
| # 3. PTDFs: Clip to [-1.5, +1.5] and round to 4 decimals (precision needed) | |
| ptdf_cols = [col for col in pl_df.columns if col.startswith('ptdf_')] | |
| if ptdf_cols: | |
| pl_df = pl_df.with_columns([ | |
| pl.col(col).clip(-1.5, 1.5).round(4).alias(col) | |
| for col in ptdf_cols | |
| ]) | |
| print(f" [OK] PTDFs: {len(ptdf_cols)} columns clipped to [-1.5, +1.5]") | |
| # 4. Other float columns: Round to 2 decimals | |
| float_cols = [col for col in pl_df.columns | |
| if pl_df[col].dtype in [pl.Float64, pl.Float32] | |
| and col not in ['shadow_price', 'ram'] + ptdf_cols] | |
| if float_cols: | |
| pl_df = pl_df.with_columns([ | |
| pl.col(col).round(2).alias(col) | |
| for col in float_cols | |
| ]) | |
| print(f" [OK] Other floats: {len(float_cols)} columns rounded to 2 decimals") | |
| # --- COLUMN SELECTION --- | |
| # Define columns to keep | |
| keep_cols = [ | |
| # Identifiers | |
| 'tso', 'cnec_name', 'cnec_eic', 'direction', 'cont_name', | |
| # Primary features | |
| 'fmax', 'ram', 'shadow_price', 'shadow_price_log', | |
| # Additional features | |
| 'fuaf', 'frm', 'ram_mcp', 'f0core', 'imax', | |
| # PTDFs (all Core FBMC zones) | |
| 'ptdf_AT', 'ptdf_BE', 'ptdf_CZ', 'ptdf_DE', 'ptdf_FR', 'ptdf_HR', | |
| 'ptdf_HU', 'ptdf_NL', 'ptdf_PL', 'ptdf_RO', 'ptdf_SI', 'ptdf_SK', | |
| # Metadata | |
| 'collection_date' | |
| ] | |
| # Filter to only columns that exist in the dataframe | |
| existing_keep_cols = [col for col in keep_cols if col in pl_df.columns] | |
| discarded_cols = [col for col in pl_df.columns if col not in existing_keep_cols] | |
| # Select only kept columns | |
| pl_df = pl_df.select(existing_keep_cols) | |
| print() | |
| print(f" [OK] Column selection: {len(existing_keep_cols)} kept, {len(discarded_cols)} discarded") | |
| if discarded_cols: | |
| print(f" Discarded: {', '.join(sorted(discarded_cols)[:10])}...") | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| pl_df.write_parquet(output_path) | |
| print() | |
| print("=" * 70) | |
| print("CNEC/PTDF Collection Complete") | |
| print("=" * 70) | |
| print(f"Total records: {pl_df.shape[0]:,}") | |
| print(f"Columns: {pl_df.shape[1]} ({len(existing_keep_cols)} kept)") | |
| print(f"CNEC fields: tso, cnec_name, cnec_eic, direction, shadow_price") | |
| print(f"Features: fmax, ram, fuaf, frm, shadow_price_log") | |
| print(f"PTDF fields: ptdf_AT, ptdf_BE, ptdf_CZ, ptdf_DE, ptdf_FR, etc.") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") | |
| return pl_df | |
| else: | |
| print("No CNEC/PTDF data collected") | |
| return None | |
| def collect_lta_sample( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect LTA (Long Term Allocation) data - separate from CNEC data. | |
| Note: lta_margin in CNEC data is 100% zero under Extended LTA approach. | |
| This method collects actual LTA allocations from dedicated LTA publication. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Polars DataFrame with LTA data | |
| """ | |
| import time | |
| print("=" * 70) | |
| print("JAO LTA Data Collection (Long Term Allocations)") | |
| print("=" * 70) | |
| # LTA query uses date range, not individual days | |
| print(f"Date range: {start_date} to {end_date}") | |
| print() | |
| try: | |
| # Convert to pandas Timestamps with UTC timezone | |
| pd_start = pd.Timestamp(start_date, tz='UTC') | |
| pd_end = pd.Timestamp(end_date, tz='UTC') | |
| # Query LTA data for the entire period | |
| print("Querying LTA data...") | |
| df = self.client.query_lta(pd_start, pd_end) | |
| if df is not None and not df.empty: | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(df) | |
| # Round float columns to 2 decimals | |
| float_cols = [col for col in pl_df.columns | |
| if pl_df[col].dtype in [pl.Float64, pl.Float32]] | |
| if float_cols: | |
| pl_df = pl_df.with_columns([ | |
| pl.col(col).round(2).alias(col) | |
| for col in float_cols | |
| ]) | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| pl_df.write_parquet(output_path) | |
| print() | |
| print("=" * 70) | |
| print("LTA Collection Complete") | |
| print("=" * 70) | |
| print(f"Total records: {pl_df.shape[0]:,}") | |
| print(f"Columns: {pl_df.shape[1]}") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") | |
| return pl_df | |
| else: | |
| print("⚠️ No LTA data available for this period") | |
| return None | |
| except Exception as e: | |
| print(f"❌ LTA collection failed: {e}") | |
| print(" This may be expected if LTA data is not published for this period") | |
| return None | |
| def collect_net_positions_sample( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect Net Position bounds (Min/Max) for Core FBMC zones. | |
| Net positions define the domain boundaries for each bidding zone. | |
| Essential for understanding feasible commercial exchange patterns. | |
| Implements JAO API rate limiting: | |
| - 100 requests/minute limit | |
| - 1 second between requests (60 req/min with safety margin) | |
| - Exponential backoff on 429 errors | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Polars DataFrame with net position data | |
| """ | |
| import time | |
| from requests.exceptions import HTTPError | |
| print("=" * 70) | |
| print("JAO Net Position Data Collection (Min/Max Bounds)") | |
| print("=" * 70) | |
| dates = self._generate_date_range(start_date, end_date) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Total dates: {len(dates)}") | |
| print(f"Rate limiting: 1s between requests, exponential backoff on 429") | |
| print() | |
| all_data = [] | |
| failed_dates = [] | |
| for date in tqdm(dates, desc="Collecting Net Positions"): | |
| # Retry logic with exponential backoff | |
| max_retries = 5 | |
| base_delay = 60 # Start with 60s on 429 error | |
| success = False | |
| for attempt in range(max_retries): | |
| try: | |
| # Rate limiting: 1 second between all requests | |
| time.sleep(1) | |
| # Convert to pandas Timestamp with UTC timezone | |
| pd_date = pd.Timestamp(date, tz='UTC') | |
| # Query min/max net positions | |
| df = self.client.query_minmax_np(pd_date) | |
| if df is not None and not df.empty: | |
| # CRITICAL: Reset index to preserve mtu timestamps | |
| # Net positions have hourly 'mtu' timestamps in the index | |
| df_with_index = df.reset_index() | |
| # Add date column for reference | |
| df_with_index['collection_date'] = date | |
| all_data.append(df_with_index) | |
| success = True | |
| break # Success - exit retry loop | |
| except HTTPError as e: | |
| if e.response.status_code == 429: | |
| # Rate limited - exponential backoff | |
| wait_time = base_delay * (2 ** attempt) | |
| if attempt < max_retries - 1: | |
| time.sleep(wait_time) | |
| else: | |
| failed_dates.append((date, "429 after retries")) | |
| else: | |
| # Other HTTP error - don't retry | |
| failed_dates.append((date, str(e))) | |
| break | |
| except Exception as e: | |
| # Non-HTTP error | |
| failed_dates.append((date, str(e))) | |
| break | |
| # Report results | |
| print() | |
| print("=" * 70) | |
| print("Net Position Collection Complete") | |
| print("=" * 70) | |
| print(f"Success: {len(all_data)}/{len(dates)} dates") | |
| if failed_dates: | |
| print(f"Failed: {len(failed_dates)} dates") | |
| if len(failed_dates) <= 10: | |
| for date, error in failed_dates: | |
| print(f" {date.date()}: {error}") | |
| else: | |
| print(f" First 10 failures:") | |
| for date, error in failed_dates[:10]: | |
| print(f" {date.date()}: {error}") | |
| if all_data: | |
| # Combine all dataframes | |
| combined_df = pd.concat(all_data, ignore_index=True) | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(combined_df) | |
| # Round float columns to 2 decimals | |
| float_cols = [col for col in pl_df.columns | |
| if pl_df[col].dtype in [pl.Float64, pl.Float32]] | |
| if float_cols: | |
| pl_df = pl_df.with_columns([ | |
| pl.col(col).round(2).alias(col) | |
| for col in float_cols | |
| ]) | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| pl_df.write_parquet(output_path) | |
| print() | |
| print(f"Total records: {pl_df.shape[0]:,}") | |
| print(f"Columns: {pl_df.shape[1]}") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") | |
| print("=" * 70) | |
| return pl_df | |
| else: | |
| print("\n[WARNING] No Net Position data collected") | |
| print("=" * 70) | |
| return None | |
| def collect_external_atc_sample( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect ATC (Available Transfer Capacity) for external (non-Core) borders. | |
| External borders connect Core FBMC to non-Core zones (e.g., FR-UK, DE-CH, PL-SE). | |
| These capacities affect loop flows and provide context for Core network loading. | |
| NOTE: This method needs to be implemented once the correct JAO API endpoint | |
| for external ATC is identified. Possible sources: | |
| - JAO ATC publications (separate from Core FBMC) | |
| - ENTSO-E Transparency Platform (Forecasted/Offered Capacity) | |
| - Bilateral capacity publications | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Polars DataFrame with external ATC data | |
| """ | |
| import time | |
| print("=" * 70) | |
| print("JAO External ATC Data Collection (Non-Core Borders)") | |
| print("=" * 70) | |
| print("[WARN] IMPLEMENTATION PENDING - Need to identify correct API endpoint") | |
| print() | |
| # TODO: Research correct JAO API method for external ATC | |
| # Candidates: | |
| # 1. JAO ATC-specific publications (if they exist) | |
| # 2. ENTSO-E Transparency API (Forecasted Transfer Capacities) | |
| # 3. Bilateral capacity allocations from TSO websites | |
| # External borders of interest (14 borders × 2 directions = 28): | |
| # FR-UK, FR-ES, FR-CH, FR-IT | |
| # DE-CH, DE-DK1, DE-DK2, DE-NO2, DE-SE4 | |
| # PL-SE4, PL-UA | |
| # CZ-UA | |
| # RO-UA, RO-MD | |
| # For now, return None and document that this needs implementation | |
| print("External ATC collection not yet implemented.") | |
| print("Potential data sources:") | |
| print(" 1. ENTSO-E Transparency API: Forecasted Transfer Capacities (Day Ahead)") | |
| print(" 2. JAO bilateral capacity publications") | |
| print(" 3. TSO-specific capacity publications") | |
| print() | |
| print("Recommendation: Collect from ENTSO-E API for consistency") | |
| print("=" * 70) | |
| return None | |
| def collect_final_domain_dense( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| target_cnec_eics: list[str], | |
| output_path: Path, | |
| use_mirror: bool = True | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect DENSE CNEC time series for specific CNECs from Final Domain. | |
| Phase 2 collection method: Gets complete hourly time series for target CNECs | |
| (binding AND non-binding states) to enable time-series feature engineering. | |
| This method queries the JAO Final Domain publication which contains ALL CNECs | |
| for each hour (DENSE format), not just active/binding constraints. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| target_cnec_eics: List of CNEC EIC codes to collect (e.g., 200 critical CNECs from Phase 1) | |
| output_path: Path to save Parquet file | |
| use_mirror: Use mirror.flowbased.eu for faster bulk downloads (recommended) | |
| Returns: | |
| Polars DataFrame with DENSE CNEC time series data | |
| Data Structure: | |
| - DENSE format: Each CNEC appears every hour (binding or not) | |
| - Columns: mtu (timestamp), tso, cnec_name, cnec_eic, direction, presolved, | |
| ram, fmax, shadow_price, frm, fuaf, ptdf_AT, ptdf_BE, ..., ptdf_SK | |
| - presolved field: True = binding, False = redundant (non-binding) | |
| - Non-binding hours: shadow_price = 0, ram = fmax | |
| Notes: | |
| - Mirror method is MUCH faster: 1 request/day vs 24 requests/day | |
| - Cannot filter by EIC on server side - downloads all CNECs, then filters locally | |
| - For 200 CNECs × 24 months: ~3.5M records (~100-150 MB compressed) | |
| """ | |
| import time | |
| print("=" * 70) | |
| print("JAO Final Domain DENSE CNEC Collection (Phase 2)") | |
| print("=" * 70) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Target CNECs: {len(target_cnec_eics)}") | |
| print(f"Method: {'Mirror (bulk daily)' if use_mirror else 'Hourly API calls'}") | |
| print() | |
| dates = self._generate_date_range(start_date, end_date) | |
| print(f"Total dates: {len(dates)}") | |
| print(f"Expected records: {len(target_cnec_eics)} CNECs × {len(dates) * 24} hours = {len(target_cnec_eics) * len(dates) * 24:,}") | |
| print() | |
| all_data = [] | |
| for date in tqdm(dates, desc="Collecting Final Domain"): | |
| try: | |
| # Convert to pandas Timestamp with UTC timezone | |
| pd_date = pd.Timestamp(date, tz='Europe/Amsterdam') | |
| # Query Final Domain for first hour of the day | |
| # If use_mirror=True, this returns the entire day (24 hours) at once | |
| df = self.client.query_final_domain( | |
| mtu=pd_date, | |
| presolved=None, # ALL CNECs (binding + non-binding) = DENSE! | |
| use_mirror=use_mirror | |
| ) | |
| if df is not None and not df.empty: | |
| # Filter to target CNECs only (local filtering) | |
| df_filtered = df[df['cnec_eic'].isin(target_cnec_eics)] | |
| if not df_filtered.empty: | |
| # Add collection date for reference | |
| df_filtered['collection_date'] = date | |
| all_data.append(df_filtered) | |
| # Rate limiting for non-mirror mode | |
| if not use_mirror: | |
| time.sleep(1) # 1 second between requests | |
| except Exception as e: | |
| print(f" Failed for {date.date()}: {e}") | |
| continue | |
| if all_data: | |
| # Combine all dataframes | |
| combined_df = pd.concat(all_data, ignore_index=True) | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(combined_df) | |
| # Validate DENSE structure | |
| unique_cnecs = pl_df['cnec_eic'].n_unique() | |
| unique_hours = pl_df['mtu'].n_unique() | |
| expected_records = unique_cnecs * unique_hours | |
| actual_records = len(pl_df) | |
| print() | |
| print("=" * 70) | |
| print("Final Domain DENSE Collection Complete") | |
| print("=" * 70) | |
| print(f"Total records: {actual_records:,}") | |
| print(f"Unique CNECs: {unique_cnecs}") | |
| print(f"Unique hours: {unique_hours}") | |
| print(f"Expected (DENSE): {expected_records:,}") | |
| if actual_records == expected_records: | |
| print("[OK] DENSE structure validated - all CNECs present every hour") | |
| else: | |
| print(f"[WARN] Structure is SPARSE! Missing {expected_records - actual_records:,} records") | |
| print(" Some CNECs may be missing for some hours") | |
| # Round float columns to 4 decimals (higher precision for PTDFs) | |
| float_cols = [col for col in pl_df.columns | |
| if pl_df[col].dtype in [pl.Float64, pl.Float32]] | |
| if float_cols: | |
| pl_df = pl_df.with_columns([ | |
| pl.col(col).round(4).alias(col) | |
| for col in float_cols | |
| ]) | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| pl_df.write_parquet(output_path) | |
| print(f"Columns: {pl_df.shape[1]}") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") | |
| print("=" * 70) | |
| return pl_df | |
| else: | |
| print("No Final Domain data collected") | |
| return None | |
| def collect_cnec_data( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_path: Path | |
| ) -> Optional[pl.DataFrame]: | |
| """Collect CNEC (Critical Network Elements with Contingencies) data. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_path: Path to save Parquet file | |
| Returns: | |
| Polars DataFrame with CNEC data | |
| """ | |
| print("=" * 70) | |
| print("JAO CNEC Data Collection") | |
| print("=" * 70) | |
| dates = self._generate_date_range(start_date, end_date) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Total dates: {len(dates)}") | |
| print() | |
| all_data = [] | |
| for date in tqdm(dates, desc="Collecting CNEC data"): | |
| try: | |
| # Get CNEC data for this date | |
| # Note: Exact method name needs to be verified from jao-py source | |
| df = self.client.query_cnec(date) | |
| if df is not None and not df.empty: | |
| # Add date column | |
| df['collection_date'] = date | |
| all_data.append(df) | |
| except Exception as e: | |
| print(f" ⚠️ Failed for {date.date()}: {e}") | |
| continue | |
| if all_data: | |
| # Combine all dataframes | |
| combined_df = pd.concat(all_data, ignore_index=True) | |
| # Convert to Polars | |
| pl_df = pl.from_pandas(combined_df) | |
| # Save to parquet | |
| output_path.parent.mkdir(parents=True, exist_ok=True) | |
| pl_df.write_parquet(output_path) | |
| print() | |
| print("=" * 70) | |
| print("CNEC Collection Complete") | |
| print("=" * 70) | |
| print(f"Total records: {pl_df.shape[0]:,}") | |
| print(f"Columns: {pl_df.shape[1]}") | |
| print(f"Output: {output_path}") | |
| print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB") | |
| return pl_df | |
| else: | |
| print("❌ No CNEC data collected") | |
| return None | |
| def collect_all_core_data( | |
| self, | |
| start_date: str, | |
| end_date: str, | |
| output_dir: Path | |
| ) -> dict: | |
| """Collect all available Core FBMC data. | |
| This method will be expanded as we discover available methods in jao-py. | |
| Args: | |
| start_date: Start date (YYYY-MM-DD) | |
| end_date: End date (YYYY-MM-DD) | |
| output_dir: Directory to save Parquet files | |
| Returns: | |
| Dictionary with paths to saved files | |
| """ | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| print("=" * 70) | |
| print("JAO Core FBMC Data Collection") | |
| print("=" * 70) | |
| print(f"Date range: {start_date} to {end_date}") | |
| print(f"Output directory: {output_dir}") | |
| print() | |
| results = {} | |
| # Note: The jao-py documentation is sparse. | |
| # We'll need to explore the client methods to find what's available. | |
| # Common methods might include: | |
| # - query_cnec() | |
| # - query_ptdf() | |
| # - query_ram() | |
| # - query_shadow_prices() | |
| # - query_net_positions() | |
| print("⚠️ Note: jao-py has limited documentation.") | |
| print(" Available methods need to be discovered from source code.") | |
| print(" See: https://github.com/fboerman/jao-py") | |
| print() | |
| # Try to collect CNECs (if method exists) | |
| try: | |
| cnec_path = output_dir / "jao_cnec_2024_2025.parquet" | |
| cnec_df = self.collect_cnec_data(start_date, end_date, cnec_path) | |
| if cnec_df is not None: | |
| results['cnec'] = cnec_path | |
| except AttributeError as e: | |
| print(f"⚠️ CNEC collection not available: {e}") | |
| print(" Check jao-py source for correct method names") | |
| # Placeholder for additional data types | |
| # These will be implemented as we discover the correct methods | |
| print() | |
| print("=" * 70) | |
| print("JAO Collection Summary") | |
| print("=" * 70) | |
| print(f"Files created: {len(results)}") | |
| for data_type, path in results.items(): | |
| file_size = path.stat().st_size / (1024**2) | |
| print(f" - {data_type}: {file_size:.1f} MB") | |
| if not results: | |
| print() | |
| print("⚠️ No data collected. This likely means:") | |
| print(" 1. The date range is outside available data (before 2022-06-09)") | |
| print(" 2. The jao-py methods need to be discovered from source code") | |
| print(" 3. Alternative: Manual download from https://publicationtool.jao.eu/core/") | |
| return results | |
| def print_jao_manual_instructions(): | |
| """Print manual download instructions for JAO data.""" | |
| print(""" | |
| ╔══════════════════════════════════════════════════════════════════════════╗ | |
| ║ JAO DATA ACCESS INSTRUCTIONS ║ | |
| ╚══════════════════════════════════════════════════════════════════════════╝ | |
| Option 1: Use jao-py Python Library (Recommended) | |
| ------------------------------------------------ | |
| Installed: ✅ jao-py 0.6.2 | |
| Available clients: | |
| - JaoPublicationToolPandasClient (Core Day-Ahead, from 2022-06-09) | |
| - JaoPublicationToolPandasIntraDay (Core Intraday, from 2024-05-29) | |
| - JaoPublicationToolPandasNordics (Nordic, from 2024-10-30) | |
| Documentation: https://github.com/fboerman/jao-py | |
| Note: jao-py has sparse documentation. Method discovery required: | |
| 1. Explore source code: https://github.com/fboerman/jao-py | |
| 2. Check available methods: dir(client) | |
| 3. Inspect method signatures: help(client.method_name) | |
| Option 2: Manual Download from JAO Website | |
| ------------------------------------------- | |
| 1. Visit: https://publicationtool.jao.eu/core/ | |
| 2. Navigate to data sections: | |
| - CNECs (Critical Network Elements) | |
| - PTDFs (Power Transfer Distribution Factors) | |
| - RAMs (Remaining Available Margins) | |
| - Shadow Prices | |
| - Net Positions | |
| 3. Select date range: Oct 2024 - Sept 2025 | |
| 4. Download format: CSV or Excel | |
| 5. Save files to: data/raw/ | |
| 6. File naming convention: | |
| - jao_cnec_2024-10_2025-09.csv | |
| - jao_ptdf_2024-10_2025-09.csv | |
| - jao_ram_2024-10_2025-09.csv | |
| 7. Convert to Parquet (we can add converter script if needed) | |
| Option 3: R Package JAOPuTo (Alternative) | |
| ------------------------------------------ | |
| If you have R installed: | |
| ```r | |
| install.packages("devtools") | |
| devtools::install_github("nicoschoutteet/JAOPuTo") | |
| # Then export data to CSV for Python ingestion | |
| ``` | |
| Option 4: Contact JAO Support | |
| ------------------------------ | |
| Email: [email protected] | |
| Subject: Bulk FBMC data download for research | |
| Request: Core FBMC data, Oct 2024 - Sept 2025 | |
| ════════════════════════════════════════════════════════════════════════════ | |
| """) | |
| if __name__ == "__main__": | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Collect JAO FBMC data using jao-py") | |
| parser.add_argument( | |
| '--start-date', | |
| default='2024-10-01', | |
| help='Start date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--end-date', | |
| default='2025-09-30', | |
| help='End date (YYYY-MM-DD)' | |
| ) | |
| parser.add_argument( | |
| '--output-dir', | |
| type=Path, | |
| default=Path('data/raw'), | |
| help='Output directory for Parquet files' | |
| ) | |
| parser.add_argument( | |
| '--manual-instructions', | |
| action='store_true', | |
| help='Print manual download instructions and exit' | |
| ) | |
| args = parser.parse_args() | |
| if args.manual_instructions: | |
| print_jao_manual_instructions() | |
| else: | |
| try: | |
| collector = JAOCollector() | |
| collector.collect_all_core_data( | |
| start_date=args.start_date, | |
| end_date=args.end_date, | |
| output_dir=args.output_dir | |
| ) | |
| except Exception as e: | |
| print(f"\n❌ Error: {e}\n") | |
| print_jao_manual_instructions() | |