"""JAO FBMC Data Collection using jao-py Python Library Collects FBMC (Flow-Based Market Coupling) data from JAO Publication Tool. Uses the jao-py Python package for API access. Data Available from JaoPublicationToolPandasClient: - Core FBMC Day-Ahead: From June 9, 2022 onwards Discovered Methods (17 total): 1. query_maxbex(day) - Maximum Bilateral Exchange (TARGET VARIABLE) 2. query_active_constraints(day) - Active CNECs with shadow prices/RAM 3. query_final_domain(mtu) - Final flowbased domain (PTDFs) 4. query_lta(d_from, d_to) - Long Term Allocations (LTN) 5. query_minmax_np(day) - Min/Max Net Positions 6. query_net_position(day) - Actual net positions 7. query_scheduled_exchange(d_from, d_to) - Scheduled exchanges 8. query_monitoring(day) - Monitoring data (may contain RAM/shadow prices) 9. query_allocationconstraint(d_from, d_to) - Allocation constraints 10. query_alpha_factor(d_from, d_to) - Alpha factors 11. query_d2cf(d_from, d_to) - Day-2 Cross Flow 12. query_initial_domain(mtu) - Initial domain 13. query_prefinal_domain(mtu) - Pre-final domain 14. query_price_spread(d_from, d_to) - Price spreads 15. query_refprog(d_from, d_to) - Reference program 16. query_status(d_from, d_to) - Status information 17. query_validations(d_from, d_to) - Validation data Documentation: https://github.com/fboerman/jao-py """ import polars as pl from pathlib import Path from datetime import datetime, timedelta from typing import Optional, List from tqdm import tqdm import pandas as pd try: from jao import JaoPublicationToolPandasClient except ImportError: raise ImportError( "jao-py not installed. Install with: uv pip install jao-py" ) class JAOCollector: """Collect FBMC data using jao-py Python library.""" def __init__(self): """Initialize JAO collector. Note: JaoPublicationToolPandasClient() takes no init parameters. """ self.client = JaoPublicationToolPandasClient() print("JAO Publication Tool Client initialized") print("Data available: Core FBMC from 2022-06-09 onwards") def _generate_date_range( self, start_date: str, end_date: str ) -> List[datetime]: """Generate list of business dates for data collection. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: List of datetime objects """ start_dt = datetime.fromisoformat(start_date) end_dt = datetime.fromisoformat(end_date) dates = [] current = start_dt while current <= end_dt: dates.append(current) current += timedelta(days=1) return dates def collect_maxbex_sample( self, start_date: str, end_date: str, output_path: Path ) -> Optional[pl.DataFrame]: """Collect MaxBEX (Maximum Bilateral Exchange) data - TARGET VARIABLE. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Polars DataFrame with MaxBEX data """ import time print("=" * 70) print("JAO MaxBEX Data Collection (TARGET VARIABLE)") print("=" * 70) dates = self._generate_date_range(start_date, end_date) print(f"Date range: {start_date} to {end_date}") print(f"Total dates: {len(dates)}") print() all_data = [] for date in tqdm(dates, desc="Collecting MaxBEX"): try: # Convert to pandas Timestamp with UTC timezone (required by jao-py) pd_date = pd.Timestamp(date, tz='UTC') # Query MaxBEX data df = self.client.query_maxbex(pd_date) if df is not None and not df.empty: all_data.append(df) # Rate limiting: 5 seconds between requests time.sleep(5) except Exception as e: print(f" Failed for {date.date()}: {e}") continue if all_data: # Combine all dataframes combined_df = pd.concat(all_data, ignore_index=False) # Convert to Polars pl_df = pl.from_pandas(combined_df) # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) pl_df.write_parquet(output_path) print() print("=" * 70) print("MaxBEX Collection Complete") print("=" * 70) print(f"Total records: {pl_df.shape[0]:,}") print(f"Columns: {pl_df.shape[1]}") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB") return pl_df else: print("No MaxBEX data collected") return None def collect_cnec_ptdf_sample( self, start_date: str, end_date: str, output_path: Path ) -> Optional[pl.DataFrame]: """Collect Active Constraints (CNECs + PTDFs in ONE call). Column Selection Strategy: - KEEP (25-26 columns): * Identifiers: tso, cnec_name, cnec_eic, direction, cont_name * Primary features: fmax, ram, shadow_price * PTDFs: ptdf_AT, ptdf_BE, ptdf_CZ, ptdf_DE, ptdf_FR, ptdf_HR, ptdf_HU, ptdf_NL, ptdf_PL, ptdf_RO, ptdf_SI, ptdf_SK * Additional features: fuaf, frm, ram_mcp, f0core, imax * Metadata: collection_date - DISCARD (14-17 columns): * Redundant: hubFrom, hubTo (derive during feature engineering) * Redundant with fuaf: f0all (r≈0.99) * Intermediate: amr, cva, iva, min_ram_factor, max_z2_z_ptdf * Empty/separate source: lta_margin (100% zero, get from LTA dataset) * Too granular: ftotal_ltn, branch_eic, fref * Non-Core FBMC: ptdf_ALBE, ptdf_ALDE Data Transformations: - Shadow prices: Log transform log(price + 1), round to 2 decimals - RAM: Clip to [0, fmax] range - PTDFs: Clip to [-1.5, +1.5] range - All floats: Round to 2 decimals (storage optimization) Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Polars DataFrame with CNEC and PTDF data """ import time import numpy as np print("=" * 70) print("JAO Active Constraints Collection (CNECs + PTDFs)") print("=" * 70) dates = self._generate_date_range(start_date, end_date) print(f"Date range: {start_date} to {end_date}") print(f"Total dates: {len(dates)}") print() all_data = [] for date in tqdm(dates, desc="Collecting CNECs/PTDFs"): try: # Convert to pandas Timestamp with UTC timezone (required by jao-py) pd_date = pd.Timestamp(date, tz='UTC') # Query active constraints (includes CNECs + PTDFs!) df = self.client.query_active_constraints(pd_date) if df is not None and not df.empty: # Add date column for reference df['collection_date'] = date all_data.append(df) # Rate limiting: 5 seconds between requests time.sleep(5) except Exception as e: print(f" Failed for {date.date()}: {e}") continue if all_data: # Combine all dataframes combined_df = pd.concat(all_data, ignore_index=True) # Convert to Polars for efficient column operations pl_df = pl.from_pandas(combined_df) # --- DATA CLEANING & TRANSFORMATIONS --- # 1. Shadow Price: Log transform + round (NO clipping) if 'shadow_price' in pl_df.columns: pl_df = pl_df.with_columns([ # Keep original rounded to 2 decimals pl.col('shadow_price').round(2).alias('shadow_price'), # Add log-transformed version (pl.col('shadow_price') + 1).log().round(4).alias('shadow_price_log') ]) print(" [OK] Shadow price: log transform applied (no clipping)") # 2. RAM: Clip to [0, fmax] and round if 'ram' in pl_df.columns and 'fmax' in pl_df.columns: pl_df = pl_df.with_columns([ pl.when(pl.col('ram') < 0) .then(0) .when(pl.col('ram') > pl.col('fmax')) .then(pl.col('fmax')) .otherwise(pl.col('ram')) .round(2) .alias('ram') ]) print(" [OK] RAM: clipped to [0, fmax] range") # 3. PTDFs: Clip to [-1.5, +1.5] and round to 4 decimals (precision needed) ptdf_cols = [col for col in pl_df.columns if col.startswith('ptdf_')] if ptdf_cols: pl_df = pl_df.with_columns([ pl.col(col).clip(-1.5, 1.5).round(4).alias(col) for col in ptdf_cols ]) print(f" [OK] PTDFs: {len(ptdf_cols)} columns clipped to [-1.5, +1.5]") # 4. Other float columns: Round to 2 decimals float_cols = [col for col in pl_df.columns if pl_df[col].dtype in [pl.Float64, pl.Float32] and col not in ['shadow_price', 'ram'] + ptdf_cols] if float_cols: pl_df = pl_df.with_columns([ pl.col(col).round(2).alias(col) for col in float_cols ]) print(f" [OK] Other floats: {len(float_cols)} columns rounded to 2 decimals") # --- COLUMN SELECTION --- # Define columns to keep keep_cols = [ # Identifiers 'tso', 'cnec_name', 'cnec_eic', 'direction', 'cont_name', # Primary features 'fmax', 'ram', 'shadow_price', 'shadow_price_log', # Additional features 'fuaf', 'frm', 'ram_mcp', 'f0core', 'imax', # PTDFs (all Core FBMC zones) 'ptdf_AT', 'ptdf_BE', 'ptdf_CZ', 'ptdf_DE', 'ptdf_FR', 'ptdf_HR', 'ptdf_HU', 'ptdf_NL', 'ptdf_PL', 'ptdf_RO', 'ptdf_SI', 'ptdf_SK', # Metadata 'collection_date' ] # Filter to only columns that exist in the dataframe existing_keep_cols = [col for col in keep_cols if col in pl_df.columns] discarded_cols = [col for col in pl_df.columns if col not in existing_keep_cols] # Select only kept columns pl_df = pl_df.select(existing_keep_cols) print() print(f" [OK] Column selection: {len(existing_keep_cols)} kept, {len(discarded_cols)} discarded") if discarded_cols: print(f" Discarded: {', '.join(sorted(discarded_cols)[:10])}...") # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) pl_df.write_parquet(output_path) print() print("=" * 70) print("CNEC/PTDF Collection Complete") print("=" * 70) print(f"Total records: {pl_df.shape[0]:,}") print(f"Columns: {pl_df.shape[1]} ({len(existing_keep_cols)} kept)") print(f"CNEC fields: tso, cnec_name, cnec_eic, direction, shadow_price") print(f"Features: fmax, ram, fuaf, frm, shadow_price_log") print(f"PTDF fields: ptdf_AT, ptdf_BE, ptdf_CZ, ptdf_DE, ptdf_FR, etc.") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") return pl_df else: print("No CNEC/PTDF data collected") return None def collect_lta_sample( self, start_date: str, end_date: str, output_path: Path ) -> Optional[pl.DataFrame]: """Collect LTA (Long Term Allocation) data - separate from CNEC data. Note: lta_margin in CNEC data is 100% zero under Extended LTA approach. This method collects actual LTA allocations from dedicated LTA publication. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Polars DataFrame with LTA data """ import time print("=" * 70) print("JAO LTA Data Collection (Long Term Allocations)") print("=" * 70) # LTA query uses date range, not individual days print(f"Date range: {start_date} to {end_date}") print() try: # Convert to pandas Timestamps with UTC timezone pd_start = pd.Timestamp(start_date, tz='UTC') pd_end = pd.Timestamp(end_date, tz='UTC') # Query LTA data for the entire period print("Querying LTA data...") df = self.client.query_lta(pd_start, pd_end) if df is not None and not df.empty: # Convert to Polars pl_df = pl.from_pandas(df) # Round float columns to 2 decimals float_cols = [col for col in pl_df.columns if pl_df[col].dtype in [pl.Float64, pl.Float32]] if float_cols: pl_df = pl_df.with_columns([ pl.col(col).round(2).alias(col) for col in float_cols ]) # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) pl_df.write_parquet(output_path) print() print("=" * 70) print("LTA Collection Complete") print("=" * 70) print(f"Total records: {pl_df.shape[0]:,}") print(f"Columns: {pl_df.shape[1]}") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") return pl_df else: print("⚠️ No LTA data available for this period") return None except Exception as e: print(f"❌ LTA collection failed: {e}") print(" This may be expected if LTA data is not published for this period") return None def collect_net_positions_sample( self, start_date: str, end_date: str, output_path: Path ) -> Optional[pl.DataFrame]: """Collect Net Position bounds (Min/Max) for Core FBMC zones. Net positions define the domain boundaries for each bidding zone. Essential for understanding feasible commercial exchange patterns. Implements JAO API rate limiting: - 100 requests/minute limit - 1 second between requests (60 req/min with safety margin) - Exponential backoff on 429 errors Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Polars DataFrame with net position data """ import time from requests.exceptions import HTTPError print("=" * 70) print("JAO Net Position Data Collection (Min/Max Bounds)") print("=" * 70) dates = self._generate_date_range(start_date, end_date) print(f"Date range: {start_date} to {end_date}") print(f"Total dates: {len(dates)}") print(f"Rate limiting: 1s between requests, exponential backoff on 429") print() all_data = [] failed_dates = [] for date in tqdm(dates, desc="Collecting Net Positions"): # Retry logic with exponential backoff max_retries = 5 base_delay = 60 # Start with 60s on 429 error success = False for attempt in range(max_retries): try: # Rate limiting: 1 second between all requests time.sleep(1) # Convert to pandas Timestamp with UTC timezone pd_date = pd.Timestamp(date, tz='UTC') # Query min/max net positions df = self.client.query_minmax_np(pd_date) if df is not None and not df.empty: # CRITICAL: Reset index to preserve mtu timestamps # Net positions have hourly 'mtu' timestamps in the index df_with_index = df.reset_index() # Add date column for reference df_with_index['collection_date'] = date all_data.append(df_with_index) success = True break # Success - exit retry loop except HTTPError as e: if e.response.status_code == 429: # Rate limited - exponential backoff wait_time = base_delay * (2 ** attempt) if attempt < max_retries - 1: time.sleep(wait_time) else: failed_dates.append((date, "429 after retries")) else: # Other HTTP error - don't retry failed_dates.append((date, str(e))) break except Exception as e: # Non-HTTP error failed_dates.append((date, str(e))) break # Report results print() print("=" * 70) print("Net Position Collection Complete") print("=" * 70) print(f"Success: {len(all_data)}/{len(dates)} dates") if failed_dates: print(f"Failed: {len(failed_dates)} dates") if len(failed_dates) <= 10: for date, error in failed_dates: print(f" {date.date()}: {error}") else: print(f" First 10 failures:") for date, error in failed_dates[:10]: print(f" {date.date()}: {error}") if all_data: # Combine all dataframes combined_df = pd.concat(all_data, ignore_index=True) # Convert to Polars pl_df = pl.from_pandas(combined_df) # Round float columns to 2 decimals float_cols = [col for col in pl_df.columns if pl_df[col].dtype in [pl.Float64, pl.Float32]] if float_cols: pl_df = pl_df.with_columns([ pl.col(col).round(2).alias(col) for col in float_cols ]) # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) pl_df.write_parquet(output_path) print() print(f"Total records: {pl_df.shape[0]:,}") print(f"Columns: {pl_df.shape[1]}") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") print("=" * 70) return pl_df else: print("\n[WARNING] No Net Position data collected") print("=" * 70) return None def collect_external_atc_sample( self, start_date: str, end_date: str, output_path: Path ) -> Optional[pl.DataFrame]: """Collect ATC (Available Transfer Capacity) for external (non-Core) borders. External borders connect Core FBMC to non-Core zones (e.g., FR-UK, DE-CH, PL-SE). These capacities affect loop flows and provide context for Core network loading. NOTE: This method needs to be implemented once the correct JAO API endpoint for external ATC is identified. Possible sources: - JAO ATC publications (separate from Core FBMC) - ENTSO-E Transparency Platform (Forecasted/Offered Capacity) - Bilateral capacity publications Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Polars DataFrame with external ATC data """ import time print("=" * 70) print("JAO External ATC Data Collection (Non-Core Borders)") print("=" * 70) print("[WARN] IMPLEMENTATION PENDING - Need to identify correct API endpoint") print() # TODO: Research correct JAO API method for external ATC # Candidates: # 1. JAO ATC-specific publications (if they exist) # 2. ENTSO-E Transparency API (Forecasted Transfer Capacities) # 3. Bilateral capacity allocations from TSO websites # External borders of interest (14 borders × 2 directions = 28): # FR-UK, FR-ES, FR-CH, FR-IT # DE-CH, DE-DK1, DE-DK2, DE-NO2, DE-SE4 # PL-SE4, PL-UA # CZ-UA # RO-UA, RO-MD # For now, return None and document that this needs implementation print("External ATC collection not yet implemented.") print("Potential data sources:") print(" 1. ENTSO-E Transparency API: Forecasted Transfer Capacities (Day Ahead)") print(" 2. JAO bilateral capacity publications") print(" 3. TSO-specific capacity publications") print() print("Recommendation: Collect from ENTSO-E API for consistency") print("=" * 70) return None def collect_final_domain_dense( self, start_date: str, end_date: str, target_cnec_eics: list[str], output_path: Path, use_mirror: bool = True ) -> Optional[pl.DataFrame]: """Collect DENSE CNEC time series for specific CNECs from Final Domain. Phase 2 collection method: Gets complete hourly time series for target CNECs (binding AND non-binding states) to enable time-series feature engineering. This method queries the JAO Final Domain publication which contains ALL CNECs for each hour (DENSE format), not just active/binding constraints. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) target_cnec_eics: List of CNEC EIC codes to collect (e.g., 200 critical CNECs from Phase 1) output_path: Path to save Parquet file use_mirror: Use mirror.flowbased.eu for faster bulk downloads (recommended) Returns: Polars DataFrame with DENSE CNEC time series data Data Structure: - DENSE format: Each CNEC appears every hour (binding or not) - Columns: mtu (timestamp), tso, cnec_name, cnec_eic, direction, presolved, ram, fmax, shadow_price, frm, fuaf, ptdf_AT, ptdf_BE, ..., ptdf_SK - presolved field: True = binding, False = redundant (non-binding) - Non-binding hours: shadow_price = 0, ram = fmax Notes: - Mirror method is MUCH faster: 1 request/day vs 24 requests/day - Cannot filter by EIC on server side - downloads all CNECs, then filters locally - For 200 CNECs × 24 months: ~3.5M records (~100-150 MB compressed) """ import time print("=" * 70) print("JAO Final Domain DENSE CNEC Collection (Phase 2)") print("=" * 70) print(f"Date range: {start_date} to {end_date}") print(f"Target CNECs: {len(target_cnec_eics)}") print(f"Method: {'Mirror (bulk daily)' if use_mirror else 'Hourly API calls'}") print() dates = self._generate_date_range(start_date, end_date) print(f"Total dates: {len(dates)}") print(f"Expected records: {len(target_cnec_eics)} CNECs × {len(dates) * 24} hours = {len(target_cnec_eics) * len(dates) * 24:,}") print() all_data = [] for date in tqdm(dates, desc="Collecting Final Domain"): try: # Convert to pandas Timestamp with UTC timezone pd_date = pd.Timestamp(date, tz='Europe/Amsterdam') # Query Final Domain for first hour of the day # If use_mirror=True, this returns the entire day (24 hours) at once df = self.client.query_final_domain( mtu=pd_date, presolved=None, # ALL CNECs (binding + non-binding) = DENSE! use_mirror=use_mirror ) if df is not None and not df.empty: # Filter to target CNECs only (local filtering) df_filtered = df[df['cnec_eic'].isin(target_cnec_eics)] if not df_filtered.empty: # Add collection date for reference df_filtered['collection_date'] = date all_data.append(df_filtered) # Rate limiting for non-mirror mode if not use_mirror: time.sleep(1) # 1 second between requests except Exception as e: print(f" Failed for {date.date()}: {e}") continue if all_data: # Combine all dataframes combined_df = pd.concat(all_data, ignore_index=True) # Convert to Polars pl_df = pl.from_pandas(combined_df) # Validate DENSE structure unique_cnecs = pl_df['cnec_eic'].n_unique() unique_hours = pl_df['mtu'].n_unique() expected_records = unique_cnecs * unique_hours actual_records = len(pl_df) print() print("=" * 70) print("Final Domain DENSE Collection Complete") print("=" * 70) print(f"Total records: {actual_records:,}") print(f"Unique CNECs: {unique_cnecs}") print(f"Unique hours: {unique_hours}") print(f"Expected (DENSE): {expected_records:,}") if actual_records == expected_records: print("[OK] DENSE structure validated - all CNECs present every hour") else: print(f"[WARN] Structure is SPARSE! Missing {expected_records - actual_records:,} records") print(" Some CNECs may be missing for some hours") # Round float columns to 4 decimals (higher precision for PTDFs) float_cols = [col for col in pl_df.columns if pl_df[col].dtype in [pl.Float64, pl.Float32]] if float_cols: pl_df = pl_df.with_columns([ pl.col(col).round(4).alias(col) for col in float_cols ]) # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) pl_df.write_parquet(output_path) print(f"Columns: {pl_df.shape[1]}") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB") print("=" * 70) return pl_df else: print("No Final Domain data collected") return None def collect_cnec_data( self, start_date: str, end_date: str, output_path: Path ) -> Optional[pl.DataFrame]: """Collect CNEC (Critical Network Elements with Contingencies) data. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Polars DataFrame with CNEC data """ print("=" * 70) print("JAO CNEC Data Collection") print("=" * 70) dates = self._generate_date_range(start_date, end_date) print(f"Date range: {start_date} to {end_date}") print(f"Total dates: {len(dates)}") print() all_data = [] for date in tqdm(dates, desc="Collecting CNEC data"): try: # Get CNEC data for this date # Note: Exact method name needs to be verified from jao-py source df = self.client.query_cnec(date) if df is not None and not df.empty: # Add date column df['collection_date'] = date all_data.append(df) except Exception as e: print(f" ⚠️ Failed for {date.date()}: {e}") continue if all_data: # Combine all dataframes combined_df = pd.concat(all_data, ignore_index=True) # Convert to Polars pl_df = pl.from_pandas(combined_df) # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) pl_df.write_parquet(output_path) print() print("=" * 70) print("CNEC Collection Complete") print("=" * 70) print(f"Total records: {pl_df.shape[0]:,}") print(f"Columns: {pl_df.shape[1]}") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB") return pl_df else: print("❌ No CNEC data collected") return None def collect_all_core_data( self, start_date: str, end_date: str, output_dir: Path ) -> dict: """Collect all available Core FBMC data. This method will be expanded as we discover available methods in jao-py. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_dir: Directory to save Parquet files Returns: Dictionary with paths to saved files """ output_dir.mkdir(parents=True, exist_ok=True) print("=" * 70) print("JAO Core FBMC Data Collection") print("=" * 70) print(f"Date range: {start_date} to {end_date}") print(f"Output directory: {output_dir}") print() results = {} # Note: The jao-py documentation is sparse. # We'll need to explore the client methods to find what's available. # Common methods might include: # - query_cnec() # - query_ptdf() # - query_ram() # - query_shadow_prices() # - query_net_positions() print("⚠️ Note: jao-py has limited documentation.") print(" Available methods need to be discovered from source code.") print(" See: https://github.com/fboerman/jao-py") print() # Try to collect CNECs (if method exists) try: cnec_path = output_dir / "jao_cnec_2024_2025.parquet" cnec_df = self.collect_cnec_data(start_date, end_date, cnec_path) if cnec_df is not None: results['cnec'] = cnec_path except AttributeError as e: print(f"⚠️ CNEC collection not available: {e}") print(" Check jao-py source for correct method names") # Placeholder for additional data types # These will be implemented as we discover the correct methods print() print("=" * 70) print("JAO Collection Summary") print("=" * 70) print(f"Files created: {len(results)}") for data_type, path in results.items(): file_size = path.stat().st_size / (1024**2) print(f" - {data_type}: {file_size:.1f} MB") if not results: print() print("⚠️ No data collected. This likely means:") print(" 1. The date range is outside available data (before 2022-06-09)") print(" 2. The jao-py methods need to be discovered from source code") print(" 3. Alternative: Manual download from https://publicationtool.jao.eu/core/") return results def print_jao_manual_instructions(): """Print manual download instructions for JAO data.""" print(""" ╔══════════════════════════════════════════════════════════════════════════╗ ║ JAO DATA ACCESS INSTRUCTIONS ║ ╚══════════════════════════════════════════════════════════════════════════╝ Option 1: Use jao-py Python Library (Recommended) ------------------------------------------------ Installed: ✅ jao-py 0.6.2 Available clients: - JaoPublicationToolPandasClient (Core Day-Ahead, from 2022-06-09) - JaoPublicationToolPandasIntraDay (Core Intraday, from 2024-05-29) - JaoPublicationToolPandasNordics (Nordic, from 2024-10-30) Documentation: https://github.com/fboerman/jao-py Note: jao-py has sparse documentation. Method discovery required: 1. Explore source code: https://github.com/fboerman/jao-py 2. Check available methods: dir(client) 3. Inspect method signatures: help(client.method_name) Option 2: Manual Download from JAO Website ------------------------------------------- 1. Visit: https://publicationtool.jao.eu/core/ 2. Navigate to data sections: - CNECs (Critical Network Elements) - PTDFs (Power Transfer Distribution Factors) - RAMs (Remaining Available Margins) - Shadow Prices - Net Positions 3. Select date range: Oct 2024 - Sept 2025 4. Download format: CSV or Excel 5. Save files to: data/raw/ 6. File naming convention: - jao_cnec_2024-10_2025-09.csv - jao_ptdf_2024-10_2025-09.csv - jao_ram_2024-10_2025-09.csv 7. Convert to Parquet (we can add converter script if needed) Option 3: R Package JAOPuTo (Alternative) ------------------------------------------ If you have R installed: ```r install.packages("devtools") devtools::install_github("nicoschoutteet/JAOPuTo") # Then export data to CSV for Python ingestion ``` Option 4: Contact JAO Support ------------------------------ Email: info@jao.eu Subject: Bulk FBMC data download for research Request: Core FBMC data, Oct 2024 - Sept 2025 ════════════════════════════════════════════════════════════════════════════ """) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Collect JAO FBMC data using jao-py") parser.add_argument( '--start-date', default='2024-10-01', help='Start date (YYYY-MM-DD)' ) parser.add_argument( '--end-date', default='2025-09-30', help='End date (YYYY-MM-DD)' ) parser.add_argument( '--output-dir', type=Path, default=Path('data/raw'), help='Output directory for Parquet files' ) parser.add_argument( '--manual-instructions', action='store_true', help='Print manual download instructions and exit' ) args = parser.parse_args() if args.manual_instructions: print_jao_manual_instructions() else: try: collector = JAOCollector() collector.collect_all_core_data( start_date=args.start_date, end_date=args.end_date, output_dir=args.output_dir ) except Exception as e: print(f"\n❌ Error: {e}\n") print_jao_manual_instructions()