Evgueni Poloukarov
feat: complete Phase 1 ENTSO-E asset-specific outage validation
27cb60a
"""JAO FBMC Data Collection using jao-py Python Library
Collects FBMC (Flow-Based Market Coupling) data from JAO Publication Tool.
Uses the jao-py Python package for API access.
Data Available from JaoPublicationToolPandasClient:
- Core FBMC Day-Ahead: From June 9, 2022 onwards
Discovered Methods (17 total):
1. query_maxbex(day) - Maximum Bilateral Exchange (TARGET VARIABLE)
2. query_active_constraints(day) - Active CNECs with shadow prices/RAM
3. query_final_domain(mtu) - Final flowbased domain (PTDFs)
4. query_lta(d_from, d_to) - Long Term Allocations (LTN)
5. query_minmax_np(day) - Min/Max Net Positions
6. query_net_position(day) - Actual net positions
7. query_scheduled_exchange(d_from, d_to) - Scheduled exchanges
8. query_monitoring(day) - Monitoring data (may contain RAM/shadow prices)
9. query_allocationconstraint(d_from, d_to) - Allocation constraints
10. query_alpha_factor(d_from, d_to) - Alpha factors
11. query_d2cf(d_from, d_to) - Day-2 Cross Flow
12. query_initial_domain(mtu) - Initial domain
13. query_prefinal_domain(mtu) - Pre-final domain
14. query_price_spread(d_from, d_to) - Price spreads
15. query_refprog(d_from, d_to) - Reference program
16. query_status(d_from, d_to) - Status information
17. query_validations(d_from, d_to) - Validation data
Documentation: https://github.com/fboerman/jao-py
"""
import polars as pl
from pathlib import Path
from datetime import datetime, timedelta
from typing import Optional, List
from tqdm import tqdm
import pandas as pd
try:
from jao import JaoPublicationToolPandasClient
except ImportError:
raise ImportError(
"jao-py not installed. Install with: uv pip install jao-py"
)
class JAOCollector:
"""Collect FBMC data using jao-py Python library."""
def __init__(self):
"""Initialize JAO collector.
Note: JaoPublicationToolPandasClient() takes no init parameters.
"""
self.client = JaoPublicationToolPandasClient()
print("JAO Publication Tool Client initialized")
print("Data available: Core FBMC from 2022-06-09 onwards")
def _generate_date_range(
self,
start_date: str,
end_date: str
) -> List[datetime]:
"""Generate list of business dates for data collection.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
Returns:
List of datetime objects
"""
start_dt = datetime.fromisoformat(start_date)
end_dt = datetime.fromisoformat(end_date)
dates = []
current = start_dt
while current <= end_dt:
dates.append(current)
current += timedelta(days=1)
return dates
def collect_maxbex_sample(
self,
start_date: str,
end_date: str,
output_path: Path
) -> Optional[pl.DataFrame]:
"""Collect MaxBEX (Maximum Bilateral Exchange) data - TARGET VARIABLE.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Polars DataFrame with MaxBEX data
"""
import time
print("=" * 70)
print("JAO MaxBEX Data Collection (TARGET VARIABLE)")
print("=" * 70)
dates = self._generate_date_range(start_date, end_date)
print(f"Date range: {start_date} to {end_date}")
print(f"Total dates: {len(dates)}")
print()
all_data = []
for date in tqdm(dates, desc="Collecting MaxBEX"):
try:
# Convert to pandas Timestamp with UTC timezone (required by jao-py)
pd_date = pd.Timestamp(date, tz='UTC')
# Query MaxBEX data
df = self.client.query_maxbex(pd_date)
if df is not None and not df.empty:
all_data.append(df)
# Rate limiting: 5 seconds between requests
time.sleep(5)
except Exception as e:
print(f" Failed for {date.date()}: {e}")
continue
if all_data:
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=False)
# Convert to Polars
pl_df = pl.from_pandas(combined_df)
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
pl_df.write_parquet(output_path)
print()
print("=" * 70)
print("MaxBEX Collection Complete")
print("=" * 70)
print(f"Total records: {pl_df.shape[0]:,}")
print(f"Columns: {pl_df.shape[1]}")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB")
return pl_df
else:
print("No MaxBEX data collected")
return None
def collect_cnec_ptdf_sample(
self,
start_date: str,
end_date: str,
output_path: Path
) -> Optional[pl.DataFrame]:
"""Collect Active Constraints (CNECs + PTDFs in ONE call).
Column Selection Strategy:
- KEEP (25-26 columns):
* Identifiers: tso, cnec_name, cnec_eic, direction, cont_name
* Primary features: fmax, ram, shadow_price
* PTDFs: ptdf_AT, ptdf_BE, ptdf_CZ, ptdf_DE, ptdf_FR, ptdf_HR,
ptdf_HU, ptdf_NL, ptdf_PL, ptdf_RO, ptdf_SI, ptdf_SK
* Additional features: fuaf, frm, ram_mcp, f0core, imax
* Metadata: collection_date
- DISCARD (14-17 columns):
* Redundant: hubFrom, hubTo (derive during feature engineering)
* Redundant with fuaf: f0all (r≈0.99)
* Intermediate: amr, cva, iva, min_ram_factor, max_z2_z_ptdf
* Empty/separate source: lta_margin (100% zero, get from LTA dataset)
* Too granular: ftotal_ltn, branch_eic, fref
* Non-Core FBMC: ptdf_ALBE, ptdf_ALDE
Data Transformations:
- Shadow prices: Log transform log(price + 1), round to 2 decimals
- RAM: Clip to [0, fmax] range
- PTDFs: Clip to [-1.5, +1.5] range
- All floats: Round to 2 decimals (storage optimization)
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Polars DataFrame with CNEC and PTDF data
"""
import time
import numpy as np
print("=" * 70)
print("JAO Active Constraints Collection (CNECs + PTDFs)")
print("=" * 70)
dates = self._generate_date_range(start_date, end_date)
print(f"Date range: {start_date} to {end_date}")
print(f"Total dates: {len(dates)}")
print()
all_data = []
for date in tqdm(dates, desc="Collecting CNECs/PTDFs"):
try:
# Convert to pandas Timestamp with UTC timezone (required by jao-py)
pd_date = pd.Timestamp(date, tz='UTC')
# Query active constraints (includes CNECs + PTDFs!)
df = self.client.query_active_constraints(pd_date)
if df is not None and not df.empty:
# Add date column for reference
df['collection_date'] = date
all_data.append(df)
# Rate limiting: 5 seconds between requests
time.sleep(5)
except Exception as e:
print(f" Failed for {date.date()}: {e}")
continue
if all_data:
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=True)
# Convert to Polars for efficient column operations
pl_df = pl.from_pandas(combined_df)
# --- DATA CLEANING & TRANSFORMATIONS ---
# 1. Shadow Price: Log transform + round (NO clipping)
if 'shadow_price' in pl_df.columns:
pl_df = pl_df.with_columns([
# Keep original rounded to 2 decimals
pl.col('shadow_price').round(2).alias('shadow_price'),
# Add log-transformed version
(pl.col('shadow_price') + 1).log().round(4).alias('shadow_price_log')
])
print(" [OK] Shadow price: log transform applied (no clipping)")
# 2. RAM: Clip to [0, fmax] and round
if 'ram' in pl_df.columns and 'fmax' in pl_df.columns:
pl_df = pl_df.with_columns([
pl.when(pl.col('ram') < 0)
.then(0)
.when(pl.col('ram') > pl.col('fmax'))
.then(pl.col('fmax'))
.otherwise(pl.col('ram'))
.round(2)
.alias('ram')
])
print(" [OK] RAM: clipped to [0, fmax] range")
# 3. PTDFs: Clip to [-1.5, +1.5] and round to 4 decimals (precision needed)
ptdf_cols = [col for col in pl_df.columns if col.startswith('ptdf_')]
if ptdf_cols:
pl_df = pl_df.with_columns([
pl.col(col).clip(-1.5, 1.5).round(4).alias(col)
for col in ptdf_cols
])
print(f" [OK] PTDFs: {len(ptdf_cols)} columns clipped to [-1.5, +1.5]")
# 4. Other float columns: Round to 2 decimals
float_cols = [col for col in pl_df.columns
if pl_df[col].dtype in [pl.Float64, pl.Float32]
and col not in ['shadow_price', 'ram'] + ptdf_cols]
if float_cols:
pl_df = pl_df.with_columns([
pl.col(col).round(2).alias(col)
for col in float_cols
])
print(f" [OK] Other floats: {len(float_cols)} columns rounded to 2 decimals")
# --- COLUMN SELECTION ---
# Define columns to keep
keep_cols = [
# Identifiers
'tso', 'cnec_name', 'cnec_eic', 'direction', 'cont_name',
# Primary features
'fmax', 'ram', 'shadow_price', 'shadow_price_log',
# Additional features
'fuaf', 'frm', 'ram_mcp', 'f0core', 'imax',
# PTDFs (all Core FBMC zones)
'ptdf_AT', 'ptdf_BE', 'ptdf_CZ', 'ptdf_DE', 'ptdf_FR', 'ptdf_HR',
'ptdf_HU', 'ptdf_NL', 'ptdf_PL', 'ptdf_RO', 'ptdf_SI', 'ptdf_SK',
# Metadata
'collection_date'
]
# Filter to only columns that exist in the dataframe
existing_keep_cols = [col for col in keep_cols if col in pl_df.columns]
discarded_cols = [col for col in pl_df.columns if col not in existing_keep_cols]
# Select only kept columns
pl_df = pl_df.select(existing_keep_cols)
print()
print(f" [OK] Column selection: {len(existing_keep_cols)} kept, {len(discarded_cols)} discarded")
if discarded_cols:
print(f" Discarded: {', '.join(sorted(discarded_cols)[:10])}...")
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
pl_df.write_parquet(output_path)
print()
print("=" * 70)
print("CNEC/PTDF Collection Complete")
print("=" * 70)
print(f"Total records: {pl_df.shape[0]:,}")
print(f"Columns: {pl_df.shape[1]} ({len(existing_keep_cols)} kept)")
print(f"CNEC fields: tso, cnec_name, cnec_eic, direction, shadow_price")
print(f"Features: fmax, ram, fuaf, frm, shadow_price_log")
print(f"PTDF fields: ptdf_AT, ptdf_BE, ptdf_CZ, ptdf_DE, ptdf_FR, etc.")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB")
return pl_df
else:
print("No CNEC/PTDF data collected")
return None
def collect_lta_sample(
self,
start_date: str,
end_date: str,
output_path: Path
) -> Optional[pl.DataFrame]:
"""Collect LTA (Long Term Allocation) data - separate from CNEC data.
Note: lta_margin in CNEC data is 100% zero under Extended LTA approach.
This method collects actual LTA allocations from dedicated LTA publication.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Polars DataFrame with LTA data
"""
import time
print("=" * 70)
print("JAO LTA Data Collection (Long Term Allocations)")
print("=" * 70)
# LTA query uses date range, not individual days
print(f"Date range: {start_date} to {end_date}")
print()
try:
# Convert to pandas Timestamps with UTC timezone
pd_start = pd.Timestamp(start_date, tz='UTC')
pd_end = pd.Timestamp(end_date, tz='UTC')
# Query LTA data for the entire period
print("Querying LTA data...")
df = self.client.query_lta(pd_start, pd_end)
if df is not None and not df.empty:
# Convert to Polars
pl_df = pl.from_pandas(df)
# Round float columns to 2 decimals
float_cols = [col for col in pl_df.columns
if pl_df[col].dtype in [pl.Float64, pl.Float32]]
if float_cols:
pl_df = pl_df.with_columns([
pl.col(col).round(2).alias(col)
for col in float_cols
])
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
pl_df.write_parquet(output_path)
print()
print("=" * 70)
print("LTA Collection Complete")
print("=" * 70)
print(f"Total records: {pl_df.shape[0]:,}")
print(f"Columns: {pl_df.shape[1]}")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB")
return pl_df
else:
print("⚠️ No LTA data available for this period")
return None
except Exception as e:
print(f"❌ LTA collection failed: {e}")
print(" This may be expected if LTA data is not published for this period")
return None
def collect_net_positions_sample(
self,
start_date: str,
end_date: str,
output_path: Path
) -> Optional[pl.DataFrame]:
"""Collect Net Position bounds (Min/Max) for Core FBMC zones.
Net positions define the domain boundaries for each bidding zone.
Essential for understanding feasible commercial exchange patterns.
Implements JAO API rate limiting:
- 100 requests/minute limit
- 1 second between requests (60 req/min with safety margin)
- Exponential backoff on 429 errors
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Polars DataFrame with net position data
"""
import time
from requests.exceptions import HTTPError
print("=" * 70)
print("JAO Net Position Data Collection (Min/Max Bounds)")
print("=" * 70)
dates = self._generate_date_range(start_date, end_date)
print(f"Date range: {start_date} to {end_date}")
print(f"Total dates: {len(dates)}")
print(f"Rate limiting: 1s between requests, exponential backoff on 429")
print()
all_data = []
failed_dates = []
for date in tqdm(dates, desc="Collecting Net Positions"):
# Retry logic with exponential backoff
max_retries = 5
base_delay = 60 # Start with 60s on 429 error
success = False
for attempt in range(max_retries):
try:
# Rate limiting: 1 second between all requests
time.sleep(1)
# Convert to pandas Timestamp with UTC timezone
pd_date = pd.Timestamp(date, tz='UTC')
# Query min/max net positions
df = self.client.query_minmax_np(pd_date)
if df is not None and not df.empty:
# CRITICAL: Reset index to preserve mtu timestamps
# Net positions have hourly 'mtu' timestamps in the index
df_with_index = df.reset_index()
# Add date column for reference
df_with_index['collection_date'] = date
all_data.append(df_with_index)
success = True
break # Success - exit retry loop
except HTTPError as e:
if e.response.status_code == 429:
# Rate limited - exponential backoff
wait_time = base_delay * (2 ** attempt)
if attempt < max_retries - 1:
time.sleep(wait_time)
else:
failed_dates.append((date, "429 after retries"))
else:
# Other HTTP error - don't retry
failed_dates.append((date, str(e)))
break
except Exception as e:
# Non-HTTP error
failed_dates.append((date, str(e)))
break
# Report results
print()
print("=" * 70)
print("Net Position Collection Complete")
print("=" * 70)
print(f"Success: {len(all_data)}/{len(dates)} dates")
if failed_dates:
print(f"Failed: {len(failed_dates)} dates")
if len(failed_dates) <= 10:
for date, error in failed_dates:
print(f" {date.date()}: {error}")
else:
print(f" First 10 failures:")
for date, error in failed_dates[:10]:
print(f" {date.date()}: {error}")
if all_data:
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=True)
# Convert to Polars
pl_df = pl.from_pandas(combined_df)
# Round float columns to 2 decimals
float_cols = [col for col in pl_df.columns
if pl_df[col].dtype in [pl.Float64, pl.Float32]]
if float_cols:
pl_df = pl_df.with_columns([
pl.col(col).round(2).alias(col)
for col in float_cols
])
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
pl_df.write_parquet(output_path)
print()
print(f"Total records: {pl_df.shape[0]:,}")
print(f"Columns: {pl_df.shape[1]}")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB")
print("=" * 70)
return pl_df
else:
print("\n[WARNING] No Net Position data collected")
print("=" * 70)
return None
def collect_external_atc_sample(
self,
start_date: str,
end_date: str,
output_path: Path
) -> Optional[pl.DataFrame]:
"""Collect ATC (Available Transfer Capacity) for external (non-Core) borders.
External borders connect Core FBMC to non-Core zones (e.g., FR-UK, DE-CH, PL-SE).
These capacities affect loop flows and provide context for Core network loading.
NOTE: This method needs to be implemented once the correct JAO API endpoint
for external ATC is identified. Possible sources:
- JAO ATC publications (separate from Core FBMC)
- ENTSO-E Transparency Platform (Forecasted/Offered Capacity)
- Bilateral capacity publications
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Polars DataFrame with external ATC data
"""
import time
print("=" * 70)
print("JAO External ATC Data Collection (Non-Core Borders)")
print("=" * 70)
print("[WARN] IMPLEMENTATION PENDING - Need to identify correct API endpoint")
print()
# TODO: Research correct JAO API method for external ATC
# Candidates:
# 1. JAO ATC-specific publications (if they exist)
# 2. ENTSO-E Transparency API (Forecasted Transfer Capacities)
# 3. Bilateral capacity allocations from TSO websites
# External borders of interest (14 borders × 2 directions = 28):
# FR-UK, FR-ES, FR-CH, FR-IT
# DE-CH, DE-DK1, DE-DK2, DE-NO2, DE-SE4
# PL-SE4, PL-UA
# CZ-UA
# RO-UA, RO-MD
# For now, return None and document that this needs implementation
print("External ATC collection not yet implemented.")
print("Potential data sources:")
print(" 1. ENTSO-E Transparency API: Forecasted Transfer Capacities (Day Ahead)")
print(" 2. JAO bilateral capacity publications")
print(" 3. TSO-specific capacity publications")
print()
print("Recommendation: Collect from ENTSO-E API for consistency")
print("=" * 70)
return None
def collect_final_domain_dense(
self,
start_date: str,
end_date: str,
target_cnec_eics: list[str],
output_path: Path,
use_mirror: bool = True
) -> Optional[pl.DataFrame]:
"""Collect DENSE CNEC time series for specific CNECs from Final Domain.
Phase 2 collection method: Gets complete hourly time series for target CNECs
(binding AND non-binding states) to enable time-series feature engineering.
This method queries the JAO Final Domain publication which contains ALL CNECs
for each hour (DENSE format), not just active/binding constraints.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
target_cnec_eics: List of CNEC EIC codes to collect (e.g., 200 critical CNECs from Phase 1)
output_path: Path to save Parquet file
use_mirror: Use mirror.flowbased.eu for faster bulk downloads (recommended)
Returns:
Polars DataFrame with DENSE CNEC time series data
Data Structure:
- DENSE format: Each CNEC appears every hour (binding or not)
- Columns: mtu (timestamp), tso, cnec_name, cnec_eic, direction, presolved,
ram, fmax, shadow_price, frm, fuaf, ptdf_AT, ptdf_BE, ..., ptdf_SK
- presolved field: True = binding, False = redundant (non-binding)
- Non-binding hours: shadow_price = 0, ram = fmax
Notes:
- Mirror method is MUCH faster: 1 request/day vs 24 requests/day
- Cannot filter by EIC on server side - downloads all CNECs, then filters locally
- For 200 CNECs × 24 months: ~3.5M records (~100-150 MB compressed)
"""
import time
print("=" * 70)
print("JAO Final Domain DENSE CNEC Collection (Phase 2)")
print("=" * 70)
print(f"Date range: {start_date} to {end_date}")
print(f"Target CNECs: {len(target_cnec_eics)}")
print(f"Method: {'Mirror (bulk daily)' if use_mirror else 'Hourly API calls'}")
print()
dates = self._generate_date_range(start_date, end_date)
print(f"Total dates: {len(dates)}")
print(f"Expected records: {len(target_cnec_eics)} CNECs × {len(dates) * 24} hours = {len(target_cnec_eics) * len(dates) * 24:,}")
print()
all_data = []
for date in tqdm(dates, desc="Collecting Final Domain"):
try:
# Convert to pandas Timestamp with UTC timezone
pd_date = pd.Timestamp(date, tz='Europe/Amsterdam')
# Query Final Domain for first hour of the day
# If use_mirror=True, this returns the entire day (24 hours) at once
df = self.client.query_final_domain(
mtu=pd_date,
presolved=None, # ALL CNECs (binding + non-binding) = DENSE!
use_mirror=use_mirror
)
if df is not None and not df.empty:
# Filter to target CNECs only (local filtering)
df_filtered = df[df['cnec_eic'].isin(target_cnec_eics)]
if not df_filtered.empty:
# Add collection date for reference
df_filtered['collection_date'] = date
all_data.append(df_filtered)
# Rate limiting for non-mirror mode
if not use_mirror:
time.sleep(1) # 1 second between requests
except Exception as e:
print(f" Failed for {date.date()}: {e}")
continue
if all_data:
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=True)
# Convert to Polars
pl_df = pl.from_pandas(combined_df)
# Validate DENSE structure
unique_cnecs = pl_df['cnec_eic'].n_unique()
unique_hours = pl_df['mtu'].n_unique()
expected_records = unique_cnecs * unique_hours
actual_records = len(pl_df)
print()
print("=" * 70)
print("Final Domain DENSE Collection Complete")
print("=" * 70)
print(f"Total records: {actual_records:,}")
print(f"Unique CNECs: {unique_cnecs}")
print(f"Unique hours: {unique_hours}")
print(f"Expected (DENSE): {expected_records:,}")
if actual_records == expected_records:
print("[OK] DENSE structure validated - all CNECs present every hour")
else:
print(f"[WARN] Structure is SPARSE! Missing {expected_records - actual_records:,} records")
print(" Some CNECs may be missing for some hours")
# Round float columns to 4 decimals (higher precision for PTDFs)
float_cols = [col for col in pl_df.columns
if pl_df[col].dtype in [pl.Float64, pl.Float32]]
if float_cols:
pl_df = pl_df.with_columns([
pl.col(col).round(4).alias(col)
for col in float_cols
])
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
pl_df.write_parquet(output_path)
print(f"Columns: {pl_df.shape[1]}")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.2f} MB")
print("=" * 70)
return pl_df
else:
print("No Final Domain data collected")
return None
def collect_cnec_data(
self,
start_date: str,
end_date: str,
output_path: Path
) -> Optional[pl.DataFrame]:
"""Collect CNEC (Critical Network Elements with Contingencies) data.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_path: Path to save Parquet file
Returns:
Polars DataFrame with CNEC data
"""
print("=" * 70)
print("JAO CNEC Data Collection")
print("=" * 70)
dates = self._generate_date_range(start_date, end_date)
print(f"Date range: {start_date} to {end_date}")
print(f"Total dates: {len(dates)}")
print()
all_data = []
for date in tqdm(dates, desc="Collecting CNEC data"):
try:
# Get CNEC data for this date
# Note: Exact method name needs to be verified from jao-py source
df = self.client.query_cnec(date)
if df is not None and not df.empty:
# Add date column
df['collection_date'] = date
all_data.append(df)
except Exception as e:
print(f" ⚠️ Failed for {date.date()}: {e}")
continue
if all_data:
# Combine all dataframes
combined_df = pd.concat(all_data, ignore_index=True)
# Convert to Polars
pl_df = pl.from_pandas(combined_df)
# Save to parquet
output_path.parent.mkdir(parents=True, exist_ok=True)
pl_df.write_parquet(output_path)
print()
print("=" * 70)
print("CNEC Collection Complete")
print("=" * 70)
print(f"Total records: {pl_df.shape[0]:,}")
print(f"Columns: {pl_df.shape[1]}")
print(f"Output: {output_path}")
print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB")
return pl_df
else:
print("❌ No CNEC data collected")
return None
def collect_all_core_data(
self,
start_date: str,
end_date: str,
output_dir: Path
) -> dict:
"""Collect all available Core FBMC data.
This method will be expanded as we discover available methods in jao-py.
Args:
start_date: Start date (YYYY-MM-DD)
end_date: End date (YYYY-MM-DD)
output_dir: Directory to save Parquet files
Returns:
Dictionary with paths to saved files
"""
output_dir.mkdir(parents=True, exist_ok=True)
print("=" * 70)
print("JAO Core FBMC Data Collection")
print("=" * 70)
print(f"Date range: {start_date} to {end_date}")
print(f"Output directory: {output_dir}")
print()
results = {}
# Note: The jao-py documentation is sparse.
# We'll need to explore the client methods to find what's available.
# Common methods might include:
# - query_cnec()
# - query_ptdf()
# - query_ram()
# - query_shadow_prices()
# - query_net_positions()
print("⚠️ Note: jao-py has limited documentation.")
print(" Available methods need to be discovered from source code.")
print(" See: https://github.com/fboerman/jao-py")
print()
# Try to collect CNECs (if method exists)
try:
cnec_path = output_dir / "jao_cnec_2024_2025.parquet"
cnec_df = self.collect_cnec_data(start_date, end_date, cnec_path)
if cnec_df is not None:
results['cnec'] = cnec_path
except AttributeError as e:
print(f"⚠️ CNEC collection not available: {e}")
print(" Check jao-py source for correct method names")
# Placeholder for additional data types
# These will be implemented as we discover the correct methods
print()
print("=" * 70)
print("JAO Collection Summary")
print("=" * 70)
print(f"Files created: {len(results)}")
for data_type, path in results.items():
file_size = path.stat().st_size / (1024**2)
print(f" - {data_type}: {file_size:.1f} MB")
if not results:
print()
print("⚠️ No data collected. This likely means:")
print(" 1. The date range is outside available data (before 2022-06-09)")
print(" 2. The jao-py methods need to be discovered from source code")
print(" 3. Alternative: Manual download from https://publicationtool.jao.eu/core/")
return results
def print_jao_manual_instructions():
"""Print manual download instructions for JAO data."""
print("""
╔══════════════════════════════════════════════════════════════════════════╗
║ JAO DATA ACCESS INSTRUCTIONS ║
╚══════════════════════════════════════════════════════════════════════════╝
Option 1: Use jao-py Python Library (Recommended)
------------------------------------------------
Installed: ✅ jao-py 0.6.2
Available clients:
- JaoPublicationToolPandasClient (Core Day-Ahead, from 2022-06-09)
- JaoPublicationToolPandasIntraDay (Core Intraday, from 2024-05-29)
- JaoPublicationToolPandasNordics (Nordic, from 2024-10-30)
Documentation: https://github.com/fboerman/jao-py
Note: jao-py has sparse documentation. Method discovery required:
1. Explore source code: https://github.com/fboerman/jao-py
2. Check available methods: dir(client)
3. Inspect method signatures: help(client.method_name)
Option 2: Manual Download from JAO Website
-------------------------------------------
1. Visit: https://publicationtool.jao.eu/core/
2. Navigate to data sections:
- CNECs (Critical Network Elements)
- PTDFs (Power Transfer Distribution Factors)
- RAMs (Remaining Available Margins)
- Shadow Prices
- Net Positions
3. Select date range: Oct 2024 - Sept 2025
4. Download format: CSV or Excel
5. Save files to: data/raw/
6. File naming convention:
- jao_cnec_2024-10_2025-09.csv
- jao_ptdf_2024-10_2025-09.csv
- jao_ram_2024-10_2025-09.csv
7. Convert to Parquet (we can add converter script if needed)
Option 3: R Package JAOPuTo (Alternative)
------------------------------------------
If you have R installed:
```r
install.packages("devtools")
devtools::install_github("nicoschoutteet/JAOPuTo")
# Then export data to CSV for Python ingestion
```
Option 4: Contact JAO Support
------------------------------
Email: [email protected]
Subject: Bulk FBMC data download for research
Request: Core FBMC data, Oct 2024 - Sept 2025
════════════════════════════════════════════════════════════════════════════
""")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(description="Collect JAO FBMC data using jao-py")
parser.add_argument(
'--start-date',
default='2024-10-01',
help='Start date (YYYY-MM-DD)'
)
parser.add_argument(
'--end-date',
default='2025-09-30',
help='End date (YYYY-MM-DD)'
)
parser.add_argument(
'--output-dir',
type=Path,
default=Path('data/raw'),
help='Output directory for Parquet files'
)
parser.add_argument(
'--manual-instructions',
action='store_true',
help='Print manual download instructions and exit'
)
args = parser.parse_args()
if args.manual_instructions:
print_jao_manual_instructions()
else:
try:
collector = JAOCollector()
collector.collect_all_core_data(
start_date=args.start_date,
end_date=args.end_date,
output_dir=args.output_dir
)
except Exception as e:
print(f"\n❌ Error: {e}\n")
print_jao_manual_instructions()