fbmc-chronos2 / scripts /collect_jao_complete.py
Evgueni Poloukarov
feat: complete Phase 1 ENTSO-E asset-specific outage validation
27cb60a
"""Master script to collect complete JAO FBMC dataset.
Collects all 5 JAO datasets in sequence:
1. MaxBEX (target variable) - 132 borders
2. CNECs/PTDFs (network constraints) - ~200 CNECs with 27 columns
3. LTA (long-term allocations) - 38 borders
4. Net Positions (domain boundaries) - 12 zones
5. External ATC (non-Core borders) - 28 directions [PENDING IMPLEMENTATION]
Usage:
# 1-week sample (testing)
python scripts/collect_jao_complete.py \
--start-date 2025-09-23 \
--end-date 2025-09-30 \
--output-dir data/raw/sample_complete
# Full 24-month dataset
python scripts/collect_jao_complete.py \
--start-date 2023-10-01 \
--end-date 2025-09-30 \
--output-dir data/raw/full
"""
import sys
from pathlib import Path
from datetime import datetime
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
from data_collection.collect_jao import JAOCollector
def main():
"""Collect complete JAO dataset (all 5 sources)."""
import argparse
parser = argparse.ArgumentParser(
description="Collect complete JAO FBMC dataset"
)
parser.add_argument(
'--start-date',
required=True,
help='Start date (YYYY-MM-DD)'
)
parser.add_argument(
'--end-date',
required=True,
help='End date (YYYY-MM-DD)'
)
parser.add_argument(
'--output-dir',
type=Path,
required=True,
help='Output directory for all datasets'
)
parser.add_argument(
'--skip-maxbex',
action='store_true',
help='Skip MaxBEX collection (if already collected)'
)
parser.add_argument(
'--skip-cnec',
action='store_true',
help='Skip CNEC/PTDF collection (if already collected)'
)
parser.add_argument(
'--skip-lta',
action='store_true',
help='Skip LTA collection (if already collected)'
)
args = parser.parse_args()
# Create output directory
args.output_dir.mkdir(parents=True, exist_ok=True)
# Initialize collector
print("\n" + "=" * 80)
print("JAO COMPLETE DATA COLLECTION PIPELINE")
print("=" * 80)
print(f"Period: {args.start_date} to {args.end_date}")
print(f"Output: {args.output_dir}")
print()
collector = JAOCollector()
# Track results
results = {}
start_time = datetime.now()
# Dataset 1: MaxBEX (Target Variable)
if not args.skip_maxbex:
print("\n" + "-" * 80)
print("DATASET 1/5: MaxBEX (Target Variable)")
print("-" * 80)
try:
maxbex_df = collector.collect_maxbex_sample(
start_date=args.start_date,
end_date=args.end_date,
output_path=args.output_dir / "jao_maxbex.parquet"
)
if maxbex_df is not None:
results['maxbex'] = {
'status': 'SUCCESS',
'records': maxbex_df.shape[0],
'columns': maxbex_df.shape[1],
'file': args.output_dir / "jao_maxbex.parquet"
}
else:
results['maxbex'] = {'status': 'FAILED', 'error': 'No data collected'}
except Exception as e:
results['maxbex'] = {'status': 'ERROR', 'error': str(e)}
print(f"[ERROR] MaxBEX collection failed: {e}")
else:
results['maxbex'] = {'status': 'SKIPPED'}
print("\n[SKIPPED] MaxBEX collection")
# Dataset 2: CNECs/PTDFs (Network Constraints)
if not args.skip_cnec:
print("\n" + "-" * 80)
print("DATASET 2/5: CNECs/PTDFs (Network Constraints)")
print("-" * 80)
try:
cnec_df = collector.collect_cnec_ptdf_sample(
start_date=args.start_date,
end_date=args.end_date,
output_path=args.output_dir / "jao_cnec_ptdf.parquet"
)
if cnec_df is not None:
results['cnec_ptdf'] = {
'status': 'SUCCESS',
'records': cnec_df.shape[0],
'columns': cnec_df.shape[1],
'file': args.output_dir / "jao_cnec_ptdf.parquet"
}
else:
results['cnec_ptdf'] = {'status': 'FAILED', 'error': 'No data collected'}
except Exception as e:
results['cnec_ptdf'] = {'status': 'ERROR', 'error': str(e)}
print(f"[ERROR] CNEC/PTDF collection failed: {e}")
else:
results['cnec_ptdf'] = {'status': 'SKIPPED'}
print("\n[SKIPPED] CNEC/PTDF collection")
# Dataset 3: LTA (Long-Term Allocations)
if not args.skip_lta:
print("\n" + "-" * 80)
print("DATASET 3/5: LTA (Long-Term Allocations)")
print("-" * 80)
try:
lta_df = collector.collect_lta_sample(
start_date=args.start_date,
end_date=args.end_date,
output_path=args.output_dir / "jao_lta.parquet"
)
if lta_df is not None:
results['lta'] = {
'status': 'SUCCESS',
'records': lta_df.shape[0],
'columns': lta_df.shape[1],
'file': args.output_dir / "jao_lta.parquet"
}
else:
results['lta'] = {'status': 'WARNING', 'error': 'No LTA data (may be expected)'}
except Exception as e:
results['lta'] = {'status': 'ERROR', 'error': str(e)}
print(f"[ERROR] LTA collection failed: {e}")
else:
results['lta'] = {'status': 'SKIPPED'}
print("\n[SKIPPED] LTA collection")
# Dataset 4: Net Positions (Domain Boundaries)
print("\n" + "-" * 80)
print("DATASET 4/5: Net Positions (Domain Boundaries)")
print("-" * 80)
try:
net_pos_df = collector.collect_net_positions_sample(
start_date=args.start_date,
end_date=args.end_date,
output_path=args.output_dir / "jao_net_positions.parquet"
)
if net_pos_df is not None:
results['net_positions'] = {
'status': 'SUCCESS',
'records': net_pos_df.shape[0],
'columns': net_pos_df.shape[1],
'file': args.output_dir / "jao_net_positions.parquet"
}
else:
results['net_positions'] = {'status': 'FAILED', 'error': 'No data collected'}
except Exception as e:
results['net_positions'] = {'status': 'ERROR', 'error': str(e)}
print(f"[ERROR] Net Positions collection failed: {e}")
# Dataset 5: External ATC (Non-Core Borders)
print("\n" + "-" * 80)
print("DATASET 5/5: External ATC (Non-Core Borders)")
print("-" * 80)
try:
atc_df = collector.collect_external_atc_sample(
start_date=args.start_date,
end_date=args.end_date,
output_path=args.output_dir / "jao_external_atc.parquet"
)
if atc_df is not None:
results['external_atc'] = {
'status': 'SUCCESS',
'records': atc_df.shape[0],
'columns': atc_df.shape[1],
'file': args.output_dir / "jao_external_atc.parquet"
}
else:
results['external_atc'] = {
'status': 'PENDING',
'error': 'Implementation not complete - see ENTSO-E API'
}
except Exception as e:
results['external_atc'] = {'status': 'ERROR', 'error': str(e)}
print(f"[ERROR] External ATC collection failed: {e}")
# Final Summary
end_time = datetime.now()
duration = end_time - start_time
print("\n\n" + "=" * 80)
print("COLLECTION SUMMARY")
print("=" * 80)
print(f"Period: {args.start_date} to {args.end_date}")
print(f"Duration: {duration}")
print()
for dataset, result in results.items():
status = result['status']
if status == 'SUCCESS':
print(f"[OK] {dataset:20s}: {result['records']:,} records, {result['columns']} columns")
if 'file' in result:
size_mb = result['file'].stat().st_size / (1024**2)
print(f" {'':<20s} File: {result['file']} ({size_mb:.2f} MB)")
elif status == 'SKIPPED':
print(f"[SKIP] {dataset:20s}: Skipped by user")
elif status == 'PENDING':
print(f"[PEND] {dataset:20s}: {result.get('error', 'Implementation pending')}")
elif status == 'WARNING':
print(f"[WARN] {dataset:20s}: {result.get('error', 'No data')}")
elif status == 'FAILED':
print(f"[FAIL] {dataset:20s}: {result.get('error', 'Collection failed')}")
elif status == 'ERROR':
print(f"[ERR] {dataset:20s}: {result.get('error', 'Unknown error')}")
# Count successes
successful = sum(1 for r in results.values() if r['status'] == 'SUCCESS')
total = len([k for k in results.keys() if results[k]['status'] != 'SKIPPED'])
print()
print(f"Successful collections: {successful}/{total}")
print("=" * 80)
# Exit code
if successful == total:
print("\n[OK] All datasets collected successfully!")
sys.exit(0)
elif successful > 0:
print("\n[WARN] Partial collection - some datasets failed")
sys.exit(1)
else:
print("\n[ERROR] Collection failed - no datasets collected")
sys.exit(2)
if __name__ == "__main__":
main()