"""OpenMeteo Weather Data Collection with Proper Rate Limiting Collects historical weather data from OpenMeteo API for 52 strategic grid points. Implements proper rate limiting based on actual OpenMeteo free tier limits. OpenMeteo Free Tier Limits (ACTUAL): - 600 calls/minute - 5,000 calls/hour - 10,000 calls/day - 300,000 calls/month Request Counting: - Base request (≤10 variables, ≤2 weeks) = 1.0 API call - >10 variables OR >2 weeks = Multiple calls (fractional) - Example: 4 weeks = 3.0 API calls, 8 weeks = 7.0 API calls Strategy: - Request data in 2-week chunks (stays at 1.0 API call per request) - 7 weather parameters (under 10 limit) - 270 requests/minute (45% of 600 limit - safe but efficient) - ~5 minutes total for 12 months × 52 locations """ import requests import polars as pl from pathlib import Path from datetime import datetime, timedelta from dotenv import load_dotenv import os import time from typing import List, Dict, Tuple from tqdm import tqdm # Load environment variables load_dotenv() # 52 Strategic Grid Points (from project plan) GRID_POINTS = { # Germany (6 points) "DE_North_Sea": {"lat": 54.5, "lon": 7.0, "name": "Offshore North Sea"}, "DE_Hamburg": {"lat": 53.5, "lon": 10.0, "name": "Hamburg/Schleswig-Holstein"}, "DE_Berlin": {"lat": 52.5, "lon": 13.5, "name": "Berlin/Brandenburg"}, "DE_Frankfurt": {"lat": 50.1, "lon": 8.7, "name": "Frankfurt"}, "DE_Munich": {"lat": 48.1, "lon": 11.6, "name": "Munich/Bavaria"}, "DE_Baltic": {"lat": 54.5, "lon": 13.0, "name": "Offshore Baltic"}, # France (5 points) "FR_Dunkirk": {"lat": 51.0, "lon": 2.3, "name": "Dunkirk/Lille"}, "FR_Paris": {"lat": 48.9, "lon": 2.3, "name": "Paris"}, "FR_Lyon": {"lat": 45.8, "lon": 4.8, "name": "Lyon"}, "FR_Marseille": {"lat": 43.3, "lon": 5.4, "name": "Marseille"}, "FR_Strasbourg": {"lat": 48.6, "lon": 7.8, "name": "Strasbourg"}, # Netherlands (4 points) "NL_Offshore": {"lat": 53.5, "lon": 4.5, "name": "Offshore North"}, "NL_Amsterdam": {"lat": 52.4, "lon": 4.9, "name": "Amsterdam"}, "NL_Rotterdam": {"lat": 51.9, "lon": 4.5, "name": "Rotterdam"}, "NL_Groningen": {"lat": 53.2, "lon": 6.6, "name": "Groningen"}, # Austria (3 points) "AT_Kaprun": {"lat": 47.26, "lon": 12.74, "name": "Kaprun"}, "AT_St_Peter": {"lat": 48.26, "lon": 13.08, "name": "St. Peter"}, "AT_Vienna": {"lat": 48.15, "lon": 16.45, "name": "Vienna"}, # Belgium (3 points) "BE_Offshore": {"lat": 51.5, "lon": 2.8, "name": "Belgian Offshore"}, "BE_Doel": {"lat": 51.32, "lon": 4.26, "name": "Doel"}, "BE_Avelgem": {"lat": 50.78, "lon": 3.45, "name": "Avelgem"}, # Czech Republic (3 points) "CZ_Hradec": {"lat": 50.70, "lon": 13.80, "name": "Hradec-RPST"}, "CZ_Bohemia": {"lat": 50.50, "lon": 13.60, "name": "Northwest Bohemia"}, "CZ_Temelin": {"lat": 49.18, "lon": 14.37, "name": "Temelin"}, # Poland (4 points) "PL_Baltic": {"lat": 54.8, "lon": 17.5, "name": "Baltic Offshore"}, "PL_SHVDC": {"lat": 54.5, "lon": 17.0, "name": "SwePol Link"}, "PL_Belchatow": {"lat": 51.27, "lon": 19.32, "name": "Belchatow"}, "PL_Mikulowa": {"lat": 51.5, "lon": 15.2, "name": "Mikulowa PST"}, # Hungary (3 points) "HU_Paks": {"lat": 46.57, "lon": 18.86, "name": "Paks Nuclear"}, "HU_Bekescsaba": {"lat": 46.68, "lon": 21.09, "name": "Bekescsaba"}, "HU_Gyor": {"lat": 47.68, "lon": 17.63, "name": "Gyor"}, # Romania (3 points) "RO_Fantanele": {"lat": 44.59, "lon": 28.57, "name": "Fantanele-Cogealac"}, "RO_Iron_Gates": {"lat": 44.67, "lon": 22.53, "name": "Iron Gates"}, "RO_Cernavoda": {"lat": 44.32, "lon": 28.03, "name": "Cernavoda"}, # Slovakia (3 points) "SK_Bohunice": {"lat": 48.49, "lon": 17.68, "name": "Bohunice/Mochovce"}, "SK_Gabcikovo": {"lat": 47.88, "lon": 17.54, "name": "Gabcikovo"}, "SK_Rimavska": {"lat": 48.38, "lon": 20.00, "name": "Rimavska Sobota"}, # Slovenia (2 points) "SI_Krsko": {"lat": 45.94, "lon": 15.52, "name": "Krsko Nuclear"}, "SI_Divaca": {"lat": 45.68, "lon": 13.97, "name": "Divaca"}, # Croatia (2 points) "HR_Ernestinovo": {"lat": 45.47, "lon": 18.66, "name": "Ernestinovo"}, "HR_Zagreb": {"lat": 45.88, "lon": 16.12, "name": "Zagreb"}, # Luxembourg (2 points) "LU_Trier": {"lat": 49.75, "lon": 6.63, "name": "Trier/Aach"}, "LU_Bauler": {"lat": 49.92, "lon": 6.20, "name": "Bauler"}, # External regions (8 points) "CH_Central": {"lat": 46.85, "lon": 9.0, "name": "Switzerland Central"}, "UK_Southeast": {"lat": 51.5, "lon": 0.0, "name": "UK Southeast"}, "ES_North": {"lat": 43.3, "lon": -3.0, "name": "Spain North"}, "IT_North": {"lat": 45.5, "lon": 9.2, "name": "Italy North"}, "NO_South": {"lat": 59.0, "lon": 5.7, "name": "Norway South"}, "SE_South": {"lat": 56.0, "lon": 13.0, "name": "Sweden South"}, "DK_West": {"lat": 56.0, "lon": 9.0, "name": "Denmark West"}, "DK_East": {"lat": 55.7, "lon": 12.6, "name": "Denmark East"}, } # Weather parameters to collect (7 params - under 10 limit) WEATHER_PARAMS = [ 'temperature_2m', 'windspeed_10m', 'windspeed_100m', 'winddirection_100m', 'shortwave_radiation', 'cloudcover', 'surface_pressure', ] class OpenMeteoCollector: """Collect weather data from OpenMeteo API with proper rate limiting.""" def __init__( self, requests_per_minute: int = 270, chunk_days: int = 14 ): """Initialize collector with rate limiting. Args: requests_per_minute: Max HTTP requests per minute (default: 270 = 45% of 600 limit) chunk_days: Days per request chunk (default: 14 = 1.0 API call) """ self.base_url = os.getenv('OPENMETEO_BASE_URL', 'https://api.open-meteo.com/v1/forecast') # OpenMeteo historical data endpoint (free tier) self.historical_url = 'https://archive-api.open-meteo.com/v1/archive' self.requests_per_minute = requests_per_minute self.chunk_days = chunk_days self.delay_seconds = 60.0 / requests_per_minute # Delay between requests self.session = requests.Session() self.total_api_calls = 0 # Track actual API call count def _generate_date_chunks( self, start_date: str, end_date: str ) -> List[Tuple[str, str]]: """Generate date range chunks of specified size. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: List of (start, end) date tuples """ start_dt = datetime.fromisoformat(start_date) end_dt = datetime.fromisoformat(end_date) chunks = [] current = start_dt while current < end_dt: chunk_end = min(current + timedelta(days=self.chunk_days - 1), end_dt) chunks.append(( current.strftime('%Y-%m-%d'), chunk_end.strftime('%Y-%m-%d') )) current = chunk_end + timedelta(days=1) return chunks def _calculate_api_calls(self, start_date: str, end_date: str) -> float: """Calculate how many API calls this request will consume. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Number of API calls (fractional) """ start_dt = datetime.fromisoformat(start_date) end_dt = datetime.fromisoformat(end_date) days = (end_dt - start_dt).days + 1 # OpenMeteo counting: ≤14 days = 1.0 call # >14 days scales fractionally if days <= 14: return 1.0 else: return days / 14.0 def fetch_location_chunk( self, location_id: str, location_data: Dict, start_date: str, end_date: str ) -> pl.DataFrame: """Fetch weather data for a single location and date chunk. Args: location_id: Location identifier (e.g., 'DE_Hamburg') location_data: Dict with 'lat', 'lon', 'name' start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) Returns: Polars DataFrame with weather data """ params = { 'latitude': location_data['lat'], 'longitude': location_data['lon'], 'hourly': ','.join(WEATHER_PARAMS), 'start_date': start_date, 'end_date': end_date, 'timezone': 'UTC' } # Calculate API call cost api_calls = self._calculate_api_calls(start_date, end_date) self.total_api_calls += api_calls try: response = self.session.get( self.historical_url, params=params, timeout=30 ) response.raise_for_status() data = response.json() # Parse hourly data hourly = data.get('hourly', {}) timestamps = hourly.get('time', []) if not timestamps: return pl.DataFrame() # Build dataframe df_data = { 'timestamp': timestamps, 'grid_point': [location_id] * len(timestamps), 'location_name': [location_data['name']] * len(timestamps), 'latitude': [location_data['lat']] * len(timestamps), 'longitude': [location_data['lon']] * len(timestamps), } # Add weather parameters for param in WEATHER_PARAMS: df_data[param] = hourly.get(param, [None] * len(timestamps)) df = pl.DataFrame(df_data) # Convert timestamp to datetime df = df.with_columns( pl.col('timestamp').str.strptime(pl.Datetime, format='%Y-%m-%dT%H:%M') ) return df except requests.exceptions.RequestException as e: print(f"[ERROR] Failed {location_id} ({start_date} to {end_date}): {e}") return pl.DataFrame() def collect_all( self, start_date: str, end_date: str, output_path: Path ) -> pl.DataFrame: """Collect weather data for all 52 grid points with rate limiting. Args: start_date: Start date (YYYY-MM-DD) end_date: End date (YYYY-MM-DD) output_path: Path to save Parquet file Returns: Combined Polars DataFrame """ # Generate date chunks date_chunks = self._generate_date_chunks(start_date, end_date) total_requests = len(GRID_POINTS) * len(date_chunks) estimated_minutes = total_requests / self.requests_per_minute print("=" * 70) print("OpenMeteo Weather Data Collection") print("=" * 70) print(f"Date range: {start_date} to {end_date}") print(f"Grid points: {len(GRID_POINTS)}") print(f"Date chunks: {len(date_chunks)} ({self.chunk_days}-day periods)") print(f"Total HTTP requests: {total_requests}") print(f"Rate limit: {self.requests_per_minute} requests/minute (45% of 600 max)") print(f"Estimated time: {estimated_minutes:.1f} minutes") print(f"Delay between requests: {self.delay_seconds:.2f}s") print() all_data = [] request_count = 0 # Iterate through all locations and date chunks with tqdm(total=total_requests, desc="Fetching weather data") as pbar: for location_id, location_data in GRID_POINTS.items(): location_chunks = [] for start_chunk, end_chunk in date_chunks: # Fetch this chunk df = self.fetch_location_chunk( location_id, location_data, start_chunk, end_chunk ) if not df.is_empty(): location_chunks.append(df) request_count += 1 pbar.update(1) # Rate limiting - wait before next request time.sleep(self.delay_seconds) # Combine all chunks for this location if location_chunks: location_df = pl.concat(location_chunks) all_data.append(location_df) print(f"[OK] {location_id}: {location_df.shape[0]} hours") # Combine all dataframes if all_data: combined_df = pl.concat(all_data) # Save to parquet output_path.parent.mkdir(parents=True, exist_ok=True) combined_df.write_parquet(output_path) print() print("=" * 70) print("Collection Complete") print("=" * 70) print(f"Total HTTP requests: {request_count}") print(f"Total API calls consumed: {self.total_api_calls:.1f}") print(f"Total records: {combined_df.shape[0]:,}") print(f"Date range: {combined_df['timestamp'].min()} to {combined_df['timestamp'].max()}") print(f"Grid points: {combined_df['grid_point'].n_unique()}") # Calculate completeness (fix: extract scalar from Polars) null_count_total = combined_df.null_count().sum_horizontal()[0] completeness = (1 - null_count_total / (combined_df.shape[0] * combined_df.shape[1])) * 100 print(f"Completeness: {completeness:.2f}%") print(f"Output: {output_path}") print(f"File size: {output_path.stat().st_size / (1024**2):.1f} MB") return combined_df else: print("[ERROR] No data collected") return pl.DataFrame() if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description="Collect OpenMeteo weather data with proper rate limiting") parser.add_argument( '--start-date', default='2024-10-01', help='Start date (YYYY-MM-DD)' ) parser.add_argument( '--end-date', default='2025-09-30', help='End date (YYYY-MM-DD)' ) parser.add_argument( '--output', type=Path, default=Path('data/raw/weather_2024_2025.parquet'), help='Output Parquet file path' ) parser.add_argument( '--requests-per-minute', type=int, default=270, help='HTTP requests per minute (default: 270 = 45%% of 600 limit)' ) parser.add_argument( '--chunk-days', type=int, default=14, help='Days per request chunk (default: 14 = 1.0 API call)' ) args = parser.parse_args() # Initialize collector and run collector = OpenMeteoCollector( requests_per_minute=args.requests_per_minute, chunk_days=args.chunk_days ) collector.collect_all( start_date=args.start_date, end_date=args.end_date, output_path=args.output )