Spaces:
Sleeping
Sleeping
File size: 7,231 Bytes
0df759f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 |
"""
Find the real Alegro HVDC cable EIC code from ENTSO-E transmission outages.
Strategy: Query BE-DE border transmission outages, parse XML to extract all
Asset_RegisteredResource.mRID codes, and identify which one is Alegro HVDC.
Author: Claude + Evgueni Poloukarov
Date: 2025-11-09
"""
import sys
from pathlib import Path
import pandas as pd
from entsoe import EntsoePandasClient
from dotenv import load_dotenv
import os
import zipfile
from io import BytesIO
import xml.etree.ElementTree as ET
# Add src to path
sys.path.insert(0, str(Path(__file__).parent.parent / 'src'))
# Load environment
load_dotenv()
def find_alegro_eic():
"""Find Alegro EIC by querying BE-DE border outages and parsing XML."""
print("=" * 80)
print("FINDING ALEGRO REAL EIC CODE FROM ENTSO-E")
print("=" * 80)
print()
api_key = os.getenv('ENTSOE_API_KEY')
if not api_key:
print("[ERROR] ENTSOE_API_KEY not found in .env")
return None
client = EntsoePandasClient(api_key=api_key)
# Belgium and Germany EIC codes
be_eic = '10YBE----------2'
de_eic = '10Y1001A1001A82H' # Germany-Luxembourg
print(f"Querying BE-DE border transmission outages...")
print(f" Belgium EIC: {be_eic}")
print(f" Germany EIC: {de_eic}")
print()
# Query recent period (last 3 months) to get sample data
start = pd.Timestamp('2024-09-01', tz='UTC')
end = pd.Timestamp('2024-11-09', tz='UTC')
print(f"Period: {start} to {end}")
print()
try:
# Query using _base_request to get raw XML
print("Making API request...")
response = client._base_request(
params={
'documentType': 'A78', # Transmission unavailability
'in_Domain': de_eic,
'out_Domain': be_eic
},
start=start,
end=end
)
print(f"[SUCCESS] Got response ({len(response.content)} bytes)")
print()
# Parse ZIP file
outages_zip = response.content
print("Parsing ZIP file...")
with zipfile.ZipFile(BytesIO(outages_zip), 'r') as zf:
xml_files = [f for f in zf.namelist() if f.endswith('.xml')]
print(f" Found {len(xml_files)} XML files")
print()
all_assets = {} # EIC -> (name, count)
for xml_file in xml_files:
with zf.open(xml_file) as xf:
xml_content = xf.read()
root = ET.fromstring(xml_content)
# Get namespace
nsmap = dict([node for _, node in ET.iterparse(
BytesIO(xml_content), events=['start-ns']
)])
ns_uri = nsmap.get('', None)
# Find TimeSeries elements
if ns_uri:
timeseries_found = root.findall('.//{' + ns_uri + '}TimeSeries')
else:
timeseries_found = root.findall('.//TimeSeries')
for ts in timeseries_found:
# Extract Asset_RegisteredResource.mRID
if ns_uri:
reg_resource = ts.find('.//{' + ns_uri + '}Asset_RegisteredResource')
else:
reg_resource = ts.find('.//Asset_RegisteredResource')
if reg_resource is not None:
# Get asset EIC and name
if ns_uri:
mrid_elem = reg_resource.find('.//{' + ns_uri + '}mRID')
name_elem = reg_resource.find('.//{' + ns_uri + '}name')
else:
mrid_elem = reg_resource.find('.//mRID')
name_elem = reg_resource.find('.//name')
if mrid_elem is not None:
asset_eic = mrid_elem.text
asset_name = name_elem.text if name_elem is not None else '(no name)'
# Count occurrences
if asset_eic in all_assets:
all_assets[asset_eic] = (asset_name, all_assets[asset_eic][1] + 1)
else:
all_assets[asset_eic] = (asset_name, 1)
print("=" * 80)
print(f"FOUND {len(all_assets)} UNIQUE TRANSMISSION ASSETS ON BE-DE BORDER")
print("=" * 80)
print()
# Sort by count (descending) to see most frequently affected assets
sorted_assets = sorted(all_assets.items(), key=lambda x: x[1][1], reverse=True)
print(f"{'EIC Code':<20} {'Asset Name':<50} {'Outages':<10}")
print("-" * 80)
alegro_candidates = []
for eic, (name, count) in sorted_assets:
print(f"{eic:<20} {name:<50} {count:<10}")
# Identify Alegro by name keywords
name_lower = name.lower()
if any(keyword in name_lower for keyword in ['alegro', 'aachen', 'liege', 'oberzier', 'lixhe', 'alde']):
alegro_candidates.append((eic, name, count))
print()
print("=" * 80)
if alegro_candidates:
print("ALEGRO CANDIDATES FOUND:")
print("=" * 80)
for eic, name, count in alegro_candidates:
print(f" EIC: {eic}")
print(f" Name: {name}")
print(f" Outages: {count}")
print()
# Return the first candidate (most likely)
alegro_eic = alegro_candidates[0][0]
alegro_name = alegro_candidates[0][1]
print(f"[IDENTIFIED] Alegro EIC: {alegro_eic}")
print(f"[IDENTIFIED] Alegro Name: {alegro_name}")
print()
return alegro_eic, alegro_name
else:
print("[WARNING] No Alegro assets found in BE-DE border outages")
print("Possible reasons:")
print(" 1. Alegro had no outages in the query period")
print(" 2. HVDC outages are reported differently (separate endpoint)")
print(" 3. Alegro is classified under a different border/domain")
print()
print("Try:")
print(" 1. Expanding date range (query last 12-24 months)")
print(" 2. Querying bidirectional (DE->BE as well as BE->DE)")
print(" 3. Checking if HVDC requires different documentType")
print()
return None, None
except Exception as e:
print(f"[ERROR] Failed to query transmission outages: {e}")
import traceback
traceback.print_exc()
return None, None
if __name__ == '__main__':
alegro_eic, alegro_name = find_alegro_eic()
if alegro_eic:
print()
print("=" * 80)
print("NEXT STEPS:")
print("=" * 80)
print(f"1. Replace custom Alegro EICs in cnecs_alegro_8.csv with: {alegro_eic}")
print(f"2. Update master CNEC list to use real EIC code")
print(f"3. Re-run asset-specific outage collection with real EIC")
print()
|