Spaces:
Sleeping
Sleeping
| import json | |
| import os | |
| import time | |
| import tempfile | |
| from datetime import datetime, timezone, timedelta | |
| from collections import defaultdict | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from huggingface_hub.errors import HfHubHTTPError | |
| from dotenv import load_dotenv | |
| import duckdb | |
| import backoff | |
| import requests.exceptions | |
| # Load environment variables | |
| load_dotenv() | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| AGENTS_REPO = "SWE-Arena/bot_metadata" | |
| REVIEW_METADATA_REPO = "SWE-Arena/review_metadata" | |
| LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata" # HuggingFace dataset for leaderboard data | |
| LEADERBOARD_TIME_FRAME_DAYS = 180 # Time frame for leaderboard | |
| GHARCHIVE_DATA_DIR = "../gharchive/data" # Local GHArchive data directory | |
| DUCKDB_CACHE_FILE = "gharchive_cache.duckdb" # Persistent DuckDB database for caching | |
| # Upload configuration | |
| UPLOAD_DELAY_SECONDS = 2 # Delay between individual file uploads to avoid rate limits | |
| MAX_RETRIES = 5 # Maximum number of retries for each upload | |
| INITIAL_BACKOFF = 60 # Initial backoff time in seconds (1 minute) | |
| MAX_BACKOFF = 3600 # Maximum backoff time in seconds (60 minutes) | |
| # ============================================================================= | |
| # UTILITY FUNCTIONS | |
| # ============================================================================= | |
| def load_jsonl(filename): | |
| """Load JSONL file and return list of dictionaries.""" | |
| if not os.path.exists(filename): | |
| return [] | |
| data = [] | |
| with open(filename, 'r', encoding='utf-8') as f: | |
| for line in f: | |
| line = line.strip() | |
| if line: | |
| try: | |
| data.append(json.loads(line)) | |
| except json.JSONDecodeError as e: | |
| print(f"Warning: Skipping invalid JSON line: {e}") | |
| return data | |
| def save_jsonl(filename, data): | |
| """Save list of dictionaries to JSONL file.""" | |
| with open(filename, 'w', encoding='utf-8') as f: | |
| for item in data: | |
| f.write(json.dumps(item) + '\n') | |
| def normalize_date_format(date_string): | |
| """ | |
| Convert date strings to standardized ISO 8601 format with Z suffix. | |
| Handles both 'T' and space-separated datetime formats (including newlines). | |
| Examples: | |
| - 2025-10-15T23:23:47.983068 -> 2025-10-15T23:23:47Z | |
| - 2025-06-17 21:21:07+00 -> 2025-06-17T21:21:07Z | |
| """ | |
| if not date_string or date_string == 'N/A': | |
| return 'N/A' | |
| try: | |
| import re | |
| # Remove all whitespace (spaces, newlines, tabs) and replace with single space | |
| date_string = re.sub(r'\s+', ' ', date_string.strip()) | |
| # Replace space with 'T' for ISO format compatibility | |
| date_string = date_string.replace(' ', 'T') | |
| # Fix incomplete timezone offset (+00 or -00 -> +00:00 or -00:00) | |
| # Check if timezone offset exists and is incomplete | |
| if len(date_string) >= 3: | |
| if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]: | |
| date_string = date_string + ':00' | |
| # Parse the date string (handles both with and without microseconds) | |
| dt = datetime.fromisoformat(date_string.replace('Z', '+00:00')) | |
| # Convert to standardized format | |
| return dt.strftime('%Y-%m-%dT%H:%M:%SZ') | |
| except Exception as e: | |
| print(f"Warning: Could not parse date '{date_string}': {e}") | |
| return date_string | |
| def get_hf_token(): | |
| """Get HuggingFace token from environment variables.""" | |
| token = os.getenv('HF_TOKEN') | |
| if not token: | |
| print("Warning: HF_TOKEN not found in environment variables") | |
| return token | |
| # ============================================================================= | |
| # HUGGINGFACE API WRAPPERS WITH ENHANCED BACKOFF | |
| # ============================================================================= | |
| def is_retryable_error(e): | |
| """ | |
| Check if exception is retryable (rate limit or timeout error). | |
| """ | |
| # Check for rate limit error (429) | |
| if isinstance(e, HfHubHTTPError): | |
| if e.response.status_code == 429: | |
| return True | |
| # Check for timeout errors | |
| if isinstance(e, (requests.exceptions.Timeout, | |
| requests.exceptions.ReadTimeout, | |
| requests.exceptions.ConnectTimeout)): | |
| return True | |
| # Check if it's a timeout error wrapped in HfHubHTTPError | |
| if isinstance(e, Exception): | |
| error_str = str(e).lower() | |
| if 'timeout' in error_str or 'timed out' in error_str: | |
| return True | |
| return False | |
| def list_repo_files_with_backoff(api, **kwargs): | |
| """Wrapper for api.list_repo_files() with exponential backoff for retryable errors.""" | |
| return api.list_repo_files(**kwargs) | |
| def hf_hub_download_with_backoff(**kwargs): | |
| """Wrapper for hf_hub_download() with exponential backoff for retryable errors.""" | |
| return hf_hub_download(**kwargs) | |
| def upload_file_with_backoff(api, **kwargs): | |
| """Wrapper for api.upload_file() with exponential backoff for retryable errors.""" | |
| return api.upload_file(**kwargs) | |
| def upload_folder_with_backoff(api, **kwargs): | |
| """Wrapper for api.upload_folder() with exponential backoff for retryable errors.""" | |
| return api.upload_folder(**kwargs) | |
| def get_duckdb_connection(): | |
| """ | |
| Initialize DuckDB connection with persistent database and optimized parallelization. | |
| Returns: | |
| DuckDB connection object | |
| """ | |
| # Use persistent database for caching results | |
| conn = duckdb.connect(DUCKDB_CACHE_FILE) | |
| # Optimize for 96-core CPU parallelization with 754GB RAM | |
| conn.execute("SET threads TO 48;") # Use all available cores | |
| conn.execute("SET memory_limit = '400GB';") # Utilize available RAM (709GB available) | |
| conn.execute("SET preserve_insertion_order = false;") # Better parallelization | |
| conn.execute("SET enable_object_cache = true;") # Cache objects for reuse | |
| conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # Use fast temp storage if needed | |
| return conn | |
| def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR): | |
| """ | |
| Generate file path patterns for GHArchive data in date range. | |
| Args: | |
| start_date: Start datetime | |
| end_date: End datetime | |
| data_dir: Directory containing GHArchive data files | |
| Returns: | |
| List of file path patterns (one per day) | |
| """ | |
| file_patterns = [] | |
| current_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0) | |
| end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0) | |
| while current_date <= end_day: | |
| # Pattern for daily parquet file: 2024-11-15.parquet | |
| pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}.parquet") | |
| file_patterns.append(pattern) | |
| # Move to next day | |
| current_date += timedelta(days=1) | |
| return file_patterns | |
| # ============================================================================= | |
| # DUCKDB QUERY FUNCTIONS | |
| # ============================================================================= | |
| def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date): | |
| """ | |
| Fetch PR review metadata for ALL agents using ONE comprehensive DuckDB query. | |
| This query combines: | |
| 1. Review events (PullRequestReviewEvent) for all agents | |
| 2. PR status (PullRequestEvent with action='closed') | |
| Args: | |
| conn: DuckDB connection instance | |
| identifiers: List of GitHub usernames/bot identifiers | |
| start_date: Start datetime (timezone-aware) | |
| end_date: End datetime (timezone-aware) | |
| Returns: | |
| Dictionary mapping agent identifier to list of PR metadata: | |
| { | |
| 'agent-identifier': [ | |
| { | |
| 'url': PR URL, | |
| 'reviewed_at': Review timestamp, | |
| 'merged_at': Merge timestamp (if merged, else None), | |
| 'closed_at': Close timestamp (if closed, else None) | |
| }, | |
| ... | |
| ], | |
| ... | |
| } | |
| """ | |
| print(f"Querying DuckDB for ALL {len(identifiers)} agents in ONE QUERY") | |
| print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}") | |
| # Generate file path patterns for review period | |
| review_patterns = generate_file_path_patterns(start_date, end_date) | |
| # Generate file path patterns for PR status (use same lookback as reviews) | |
| status_start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS) | |
| status_patterns = generate_file_path_patterns(status_start_date, end_date) | |
| # Build identifier list for IN clause | |
| identifier_list = ', '.join([f"'{id}'" for id in identifiers]) | |
| # Build comprehensive query with CTEs using parameterized file lists (Parquet optimized) | |
| query = f""" | |
| WITH review_events AS ( | |
| -- Get all review events for ALL agents | |
| SELECT | |
| payload.pull_request.html_url as url, | |
| COALESCE( | |
| payload.review.submitted_at, | |
| CAST(created_at AS VARCHAR) | |
| ) as reviewed_at, | |
| actor.login as reviewer, | |
| repo.name as repo_name, | |
| CAST(payload.pull_request.number AS INTEGER) as pr_number | |
| FROM read_parquet($review_patterns, union_by_name=true, filename=true) | |
| WHERE | |
| type = 'PullRequestReviewEvent' | |
| AND actor.login IN ({identifier_list}) | |
| AND payload.pull_request.html_url IS NOT NULL | |
| UNION ALL | |
| -- Get PR comments (IssueCommentEvent on PRs) | |
| SELECT | |
| payload.issue.html_url as url, | |
| CAST(created_at AS VARCHAR) as reviewed_at, | |
| actor.login as reviewer, | |
| repo.name as repo_name, | |
| CAST(payload.issue.number AS INTEGER) as pr_number | |
| FROM read_parquet($review_patterns, union_by_name=true, filename=true) | |
| WHERE | |
| type = 'IssueCommentEvent' | |
| AND actor.login IN ({identifier_list}) | |
| AND payload.issue.pull_request.url IS NOT NULL | |
| AND payload.issue.html_url IS NOT NULL | |
| UNION ALL | |
| -- Get review comments (PullRequestReviewCommentEvent) | |
| SELECT | |
| payload.pull_request.html_url as url, | |
| CAST(created_at AS VARCHAR) as reviewed_at, | |
| actor.login as reviewer, | |
| repo.name as repo_name, | |
| CAST(payload.pull_request.number AS INTEGER) as pr_number | |
| FROM read_parquet($review_patterns, union_by_name=true, filename=true) | |
| WHERE | |
| type = 'PullRequestReviewCommentEvent' | |
| AND actor.login IN ({identifier_list}) | |
| AND payload.pull_request.html_url IS NOT NULL | |
| ), | |
| pr_status AS ( | |
| -- Get merge/close status for those PRs | |
| SELECT | |
| payload.pull_request.html_url as url, | |
| CAST(payload.pull_request.merged AS BOOLEAN) as is_merged, | |
| payload.pull_request.merged_at as merged_at, | |
| payload.pull_request.closed_at as closed_at, | |
| created_at, | |
| ROW_NUMBER() OVER (PARTITION BY payload.pull_request.html_url ORDER BY created_at DESC) as rn | |
| FROM read_parquet($status_patterns, union_by_name=true, filename=true) | |
| WHERE | |
| type = 'PullRequestEvent' | |
| AND payload.action = 'closed' | |
| AND payload.pull_request.html_url IS NOT NULL | |
| AND payload.pull_request.html_url IN ( | |
| SELECT DISTINCT url FROM review_events | |
| ) | |
| ) | |
| -- Join review events with PR status | |
| SELECT DISTINCT | |
| re.reviewer, | |
| re.url, | |
| re.reviewed_at, | |
| ps.merged_at, | |
| ps.closed_at | |
| FROM review_events re | |
| LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.url = ps.url | |
| ORDER BY re.reviewer, re.reviewed_at DESC | |
| """ | |
| # Calculate number of days for reporting | |
| review_days = (end_date - start_date).days | |
| status_days = (end_date - status_start_date).days | |
| print(f" Querying {review_days} days for reviews, {status_days} days for PR status...") | |
| print(f" Agents: {', '.join(identifiers[:5])}{'...' if len(identifiers) > 5 else ''}") | |
| try: | |
| # Create cache table name based on date range | |
| cache_table_name = f"pr_cache_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}" | |
| # Check if cache exists and is valid | |
| cache_exists = conn.execute(f""" | |
| SELECT COUNT(*) FROM information_schema.tables | |
| WHERE table_name = '{cache_table_name}' | |
| """).fetchone()[0] > 0 | |
| if cache_exists: | |
| print(f" Using cached results from table {cache_table_name}") | |
| results = conn.execute(f""" | |
| SELECT reviewer, url, reviewed_at, merged_at, closed_at | |
| FROM {cache_table_name} | |
| WHERE reviewer IN ({identifier_list}) | |
| """).fetchall() | |
| else: | |
| print(f" Cache miss - executing full query and caching to {cache_table_name}") | |
| # Execute query with parameters | |
| results = conn.execute(query, {'review_patterns': review_patterns, 'status_patterns': status_patterns}).fetchall() | |
| # Cache the complete results for all future queries in this date range | |
| if len(results) > 0: | |
| conn.execute(f""" | |
| CREATE TABLE {cache_table_name} AS | |
| SELECT * FROM ( | |
| SELECT UNNEST($1) as reviewer, UNNEST($2) as url, | |
| UNNEST($3) as reviewed_at, UNNEST($4) as merged_at, | |
| UNNEST($5) as closed_at | |
| ) | |
| """, [ | |
| [r[0] for r in results], | |
| [r[1] for r in results], | |
| [r[2] for r in results], | |
| [r[3] for r in results], | |
| [r[4] for r in results] | |
| ]) | |
| print(f" Cached {len(results)} results to {cache_table_name}") | |
| print(f" Found {len(results)} total PR review records across all agents") | |
| # Group results by agent | |
| metadata_by_agent = defaultdict(list) | |
| for row in results: | |
| reviewer = row[0] | |
| url = row[1] | |
| reviewed_at = normalize_date_format(row[2]) if row[2] else None | |
| merged_at = normalize_date_format(row[3]) if row[3] else None | |
| closed_at = normalize_date_format(row[4]) if row[4] else None | |
| metadata_by_agent[reviewer].append({ | |
| 'url': url, | |
| 'reviewed_at': reviewed_at, | |
| 'merged_at': merged_at, | |
| 'closed_at': closed_at, | |
| }) | |
| # Print breakdown by agent | |
| print(f"Results breakdown by agent:") | |
| for identifier in identifiers: | |
| count = len(metadata_by_agent.get(identifier, [])) | |
| if count > 0: | |
| metadata = metadata_by_agent[identifier] | |
| merged_count = sum(1 for m in metadata if m['merged_at'] is not None) | |
| closed_count = sum(1 for m in metadata if m['closed_at'] is not None and m['merged_at'] is None) | |
| open_count = count - merged_count - closed_count | |
| print(f" {identifier}: {count} PRs ({merged_count} merged, {closed_count} closed, {open_count} open)") | |
| # Convert defaultdict to regular dict | |
| return dict(metadata_by_agent) | |
| except Exception as e: | |
| print(f" DuckDB error: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return {} | |
| # ============================================================================= | |
| # HUGGINGFACE STORAGE FUNCTIONS WITH BATCH UPLOAD | |
| # ============================================================================= | |
| def group_metadata_by_date(metadata_list): | |
| """ | |
| Group review metadata by date (year.month.day) for daily storage. | |
| Returns dict: {(year, month, day): [metadata_list]} | |
| """ | |
| grouped = defaultdict(list) | |
| for review_meta in metadata_list: | |
| reviewed_at = review_meta.get('reviewed_at') | |
| if not reviewed_at: | |
| continue | |
| try: | |
| dt = datetime.fromisoformat(reviewed_at.replace('Z', '+00:00')) | |
| key = (dt.year, dt.month, dt.day) | |
| grouped[key].append(review_meta) | |
| except Exception as e: | |
| print(f"Warning: Could not parse date '{reviewed_at}': {e}") | |
| return dict(grouped) | |
| def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type, commit_message, max_retries=MAX_RETRIES): | |
| """ | |
| Upload a single file with exponential backoff retry logic. | |
| Args: | |
| api: HfApi instance | |
| local_path: Local file path | |
| repo_path: Path in repository | |
| repo_id: Repository ID | |
| repo_type: Repository type (e.g., "dataset") | |
| commit_message: Commit message | |
| max_retries: Maximum number of retries | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| for attempt in range(max_retries): | |
| try: | |
| upload_file_with_backoff( | |
| api=api, | |
| path_or_fileobj=local_path, | |
| path_in_repo=repo_path, | |
| repo_id=repo_id, | |
| repo_type=repo_type, | |
| commit_message=commit_message | |
| ) | |
| return True | |
| except Exception as e: | |
| if attempt < max_retries - 1: | |
| # Calculate exponential backoff | |
| wait_time = min(INITIAL_BACKOFF * (2 ** attempt), MAX_BACKOFF) | |
| print(f" {e} error on attempt {attempt + 1}/{max_retries}. Retrying in {wait_time}s...") | |
| time.sleep(wait_time) | |
| else: | |
| print(f" Failed after {max_retries} attempts: {str(e)}") | |
| return False | |
| return False | |
| def batch_upload_review_metadata(all_metadata): | |
| """ | |
| Upload review metadata for all agents with time gaps between uploads. | |
| Each agent's data is uploaded as separate daily files with retry logic. | |
| Args: | |
| all_metadata: Dictionary mapping agent identifier to list of PR metadata | |
| Returns: | |
| tuple: (success_count, error_count) | |
| """ | |
| try: | |
| token = get_hf_token() | |
| if not token: | |
| raise Exception("No HuggingFace token found") | |
| api = HfApi(token=token) | |
| success_count = 0 | |
| error_count = 0 | |
| total_files = 0 | |
| # First, calculate total number of files to upload | |
| for agent_identifier, metadata_list in all_metadata.items(): | |
| if metadata_list: | |
| grouped = group_metadata_by_date(metadata_list) | |
| total_files += len(grouped) | |
| print(f"\n{'='*80}") | |
| print(f"Starting batch upload: {len(all_metadata)} agents, {total_files} total files") | |
| print(f"Upload delay: {UPLOAD_DELAY_SECONDS}s between files") | |
| print(f"{'='*80}\n") | |
| file_count = 0 | |
| for agent_idx, (agent_identifier, metadata_list) in enumerate(all_metadata.items(), 1): | |
| if not metadata_list: | |
| print(f"[{agent_idx}/{len(all_metadata)}] Skipping {agent_identifier} (no data)") | |
| continue | |
| # Group by date | |
| grouped = group_metadata_by_date(metadata_list) | |
| print(f"[{agent_idx}/{len(all_metadata)}] Uploading {len(grouped)} files for {agent_identifier}...") | |
| # Create temporary files for this agent | |
| agent_temp_dir = tempfile.mkdtemp() | |
| try: | |
| # Prepare all files locally | |
| local_files = [] | |
| for (review_year, month, day), day_metadata in grouped.items(): | |
| filename = f"{review_year}.{month:02d}.{day:02d}.jsonl" | |
| local_path = os.path.join(agent_temp_dir, filename) | |
| repo_path = f"{agent_identifier}/{filename}" | |
| # Sort by reviewed_at for better organization | |
| day_metadata.sort(key=lambda x: x.get('reviewed_at', ''), reverse=True) | |
| # Save to temp file | |
| save_jsonl(local_path, day_metadata) | |
| local_files.append((local_path, repo_path, len(day_metadata))) | |
| # Upload each file with delay | |
| agent_success = 0 | |
| agent_error = 0 | |
| for file_idx, (local_path, repo_path, review_count) in enumerate(local_files, 1): | |
| file_count += 1 | |
| print(f" [{file_count}/{total_files}] Uploading {repo_path} ({review_count} reviews)...", end='') | |
| if upload_single_file_with_retry( | |
| api=api, | |
| local_path=local_path, | |
| repo_path=repo_path, | |
| repo_id=REVIEW_METADATA_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Update {repo_path}", | |
| max_retries=MAX_RETRIES | |
| ): | |
| print(" ") | |
| agent_success += 1 | |
| success_count += 1 | |
| else: | |
| print(" ") | |
| agent_error += 1 | |
| error_count += 1 | |
| # Add delay between uploads (except for last file) | |
| if file_idx < len(local_files): | |
| time.sleep(UPLOAD_DELAY_SECONDS) | |
| print(f" Agent {agent_identifier}: {agent_success} uploaded, {agent_error} errors\n") | |
| finally: | |
| # Clean up temp directory | |
| if os.path.exists(agent_temp_dir): | |
| import shutil | |
| shutil.rmtree(agent_temp_dir) | |
| print(f"\n{'='*80}") | |
| print(f"Batch upload complete!") | |
| print(f" Total files: {total_files}") | |
| print(f" Successful: {success_count}") | |
| print(f" Errors: {error_count}") | |
| print(f"{'='*80}\n") | |
| return success_count, error_count | |
| except Exception as e: | |
| print(f"Error during batch upload: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return 0, total_files if 'total_files' in locals() else 0 | |
| def load_agents_from_hf(): | |
| """ | |
| Load all agent metadata JSON files from HuggingFace dataset. | |
| The github_identifier is extracted from the filename (e.g., 'agent-name[bot].json' -> 'agent-name[bot]') | |
| """ | |
| try: | |
| api = HfApi() | |
| agents = [] | |
| # List all files in the repository | |
| files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset") | |
| # Filter for JSON files only | |
| json_files = [f for f in files if f.endswith('.json')] | |
| print(f"Found {len(json_files)} agent files in {AGENTS_REPO}") | |
| # Download and parse each JSON file | |
| for json_file in json_files: | |
| try: | |
| file_path = hf_hub_download_with_backoff( | |
| repo_id=AGENTS_REPO, | |
| filename=json_file, | |
| repo_type="dataset" | |
| ) | |
| with open(file_path, 'r') as f: | |
| agent_data = json.load(f) | |
| # Only process agents with status == "public" | |
| if agent_data.get('status') != 'public': | |
| continue | |
| # Extract github_identifier from filename (remove .json extension) | |
| github_identifier = json_file.replace('.json', '') | |
| agent_data['github_identifier'] = github_identifier | |
| agents.append(agent_data) | |
| except Exception as e: | |
| print(f"Warning: Could not load {json_file}: {str(e)}") | |
| continue | |
| print(f"Loaded {len(agents)} agents from HuggingFace") | |
| return agents | |
| except Exception as e: | |
| print(f"Could not load agents from HuggingFace: {str(e)}") | |
| return [] | |
| def get_pr_status_from_metadata(review_meta): | |
| """ | |
| Derive PR status from merged_at and closed_at fields. | |
| Returns: | |
| str: 'merged', 'closed', or 'open' | |
| """ | |
| merged_at = review_meta.get('merged_at') | |
| closed_at = review_meta.get('closed_at') | |
| if merged_at: | |
| return 'merged' | |
| elif closed_at: | |
| return 'closed' | |
| else: | |
| return 'open' | |
| def calculate_review_stats_from_metadata(metadata_list): | |
| """ | |
| Calculate statistics from a list of review metadata. | |
| Returns: | |
| Dictionary with review metrics (total_reviews, merged_prs, acceptance_rate, etc.) | |
| """ | |
| total_reviews = len(metadata_list) | |
| # Count merged PRs | |
| merged_prs = sum(1 for review_meta in metadata_list | |
| if get_pr_status_from_metadata(review_meta) == 'merged') | |
| # Count rejected PRs | |
| rejected_prs = sum(1 for review_meta in metadata_list | |
| if get_pr_status_from_metadata(review_meta) == 'closed') | |
| # Count pending PRs | |
| pending_prs = sum(1 for review_meta in metadata_list | |
| if get_pr_status_from_metadata(review_meta) == 'open') | |
| # Calculate acceptance rate (exclude pending PRs) | |
| completed_prs = merged_prs + rejected_prs | |
| acceptance_rate = (merged_prs / completed_prs * 100) if completed_prs > 0 else 0 | |
| return { | |
| 'total_reviews': total_reviews, | |
| 'merged_prs': merged_prs, | |
| 'pending_prs': pending_prs, | |
| 'acceptance_rate': round(acceptance_rate, 2), | |
| } | |
| def calculate_monthly_metrics_by_agent(all_metadata_dict, agents): | |
| """ | |
| Calculate monthly metrics for all agents for visualization. | |
| Args: | |
| all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata | |
| agents: List of agent dictionaries with metadata | |
| Returns: | |
| dict: { | |
| 'agents': list of agent names, | |
| 'months': list of month labels (e.g., '2025-01'), | |
| 'data': { | |
| agent_name: { | |
| 'acceptance_rates': list of acceptance rates by month, | |
| 'total_reviews': list of review counts by month, | |
| 'merged_prs': list of merged PR counts by month, | |
| } | |
| } | |
| } | |
| """ | |
| # Create mapping from agent_identifier to agent_name | |
| identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')} | |
| if not all_metadata_dict: | |
| return {'agents': [], 'months': [], 'data': {}} | |
| # Group by agent and month | |
| agent_month_data = defaultdict(lambda: defaultdict(list)) | |
| # Flatten the dict of lists into a single list with agent_identifier added | |
| for agent_identifier, metadata_list in all_metadata_dict.items(): | |
| for review_meta in metadata_list: | |
| reviewed_at = review_meta.get('reviewed_at') | |
| if not reviewed_at: | |
| continue | |
| # Get agent_name from identifier | |
| agent_name = identifier_to_name.get(agent_identifier, agent_identifier) | |
| try: | |
| dt = datetime.fromisoformat(reviewed_at.replace('Z', '+00:00')) | |
| month_key = f"{dt.year}-{dt.month:02d}" | |
| agent_month_data[agent_name][month_key].append(review_meta) | |
| except Exception as e: | |
| print(f"Warning: Could not parse date '{reviewed_at}': {e}") | |
| continue | |
| # Get all unique months and sort them | |
| all_months = set() | |
| for agent_data in agent_month_data.values(): | |
| all_months.update(agent_data.keys()) | |
| months = sorted(list(all_months)) | |
| # Calculate metrics for each agent and month | |
| result_data = {} | |
| for agent_name, month_dict in agent_month_data.items(): | |
| acceptance_rates = [] | |
| total_reviews_list = [] | |
| merged_prs_list = [] | |
| for month in months: | |
| reviews_in_month = month_dict.get(month, []) | |
| # Count merged PRs | |
| merged_count = sum(1 for review in reviews_in_month | |
| if get_pr_status_from_metadata(review) == 'merged') | |
| # Count rejected PRs | |
| rejected_count = sum(1 for review in reviews_in_month | |
| if get_pr_status_from_metadata(review) == 'closed') | |
| # Total reviews | |
| total_count = len(reviews_in_month) | |
| # Calculate acceptance rate (exclude pending PRs) | |
| completed_count = merged_count + rejected_count | |
| acceptance_rate = (merged_count / completed_count * 100) if completed_count > 0 else None | |
| acceptance_rates.append(acceptance_rate) | |
| total_reviews_list.append(total_count) | |
| merged_prs_list.append(merged_count) | |
| result_data[agent_name] = { | |
| 'acceptance_rates': acceptance_rates, | |
| 'total_reviews': total_reviews_list, | |
| 'merged_prs': merged_prs_list, | |
| } | |
| agents_list = sorted(list(agent_month_data.keys())) | |
| return { | |
| 'agents': agents_list, | |
| 'months': months, | |
| 'data': result_data | |
| } | |
| def construct_leaderboard_from_metadata(all_metadata_dict, agents): | |
| """ | |
| Construct leaderboard from in-memory review metadata. | |
| Args: | |
| all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata | |
| agents: List of agent dictionaries with metadata | |
| Returns: | |
| Dictionary of agent stats. | |
| """ | |
| print("Constructing leaderboard from review metadata...") | |
| if not agents: | |
| print("No agents found") | |
| return {} | |
| print(f"Processing {len(agents)} agents") | |
| cache_dict = {} | |
| for agent in agents: | |
| identifier = agent.get('github_identifier') | |
| agent_name = agent.get('name', 'Unknown') | |
| # Get metadata for this agent from the dictionary | |
| bot_metadata = all_metadata_dict.get(identifier, []) | |
| # Calculate stats | |
| stats = calculate_review_stats_from_metadata(bot_metadata) | |
| cache_dict[identifier] = { | |
| 'name': agent_name, | |
| 'website': agent.get('website', 'N/A'), | |
| 'github_identifier': identifier, | |
| **stats | |
| } | |
| print(f"Constructed cache with {len(cache_dict)} agent entries") | |
| return cache_dict | |
| def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics): | |
| """ | |
| Save leaderboard data and monthly metrics to HuggingFace dataset as swe-review.json. | |
| Args: | |
| leaderboard_dict: Dictionary of agent stats from construct_leaderboard_from_metadata() | |
| monthly_metrics: Monthly metrics data from calculate_monthly_metrics_by_agent() | |
| Returns: | |
| bool: True if successful, False otherwise | |
| """ | |
| try: | |
| token = get_hf_token() | |
| if not token: | |
| raise Exception("No HuggingFace token found") | |
| api = HfApi(token=token) | |
| filename = "swe-review.json" | |
| # Combine leaderboard and monthly metrics | |
| combined_data = { | |
| 'last_updated': datetime.now(timezone.utc).isoformat(), | |
| 'leaderboard': leaderboard_dict, | |
| 'monthly_metrics': monthly_metrics, | |
| 'metadata': { | |
| 'leaderboard_time_frame_days': LEADERBOARD_TIME_FRAME_DAYS | |
| } | |
| } | |
| # Save locally first | |
| with open(filename, 'w') as f: | |
| json.dump(combined_data, f, indent=2) | |
| try: | |
| # Upload to HuggingFace with retry logic | |
| print(f"Uploading leaderboard data...", end='') | |
| upload_file_with_backoff( | |
| api=api, | |
| path_or_fileobj=filename, | |
| path_in_repo=filename, | |
| repo_id=LEADERBOARD_REPO, | |
| repo_type="dataset" | |
| ) | |
| print(" ") | |
| print(f"Saved leaderboard data to HuggingFace: {filename}") | |
| return True | |
| finally: | |
| # Always clean up local file | |
| if os.path.exists(filename): | |
| os.remove(filename) | |
| except Exception as e: | |
| print(f" ") | |
| print(f"Error saving leaderboard data: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return False | |
| # ============================================================================= | |
| # MAIN MINING FUNCTION | |
| # ============================================================================= | |
| def mine_all_agents(): | |
| """ | |
| Mine review metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace. | |
| Uses ONE DuckDB query for ALL agents, then batch uploads with time gaps. | |
| """ | |
| # Load agent metadata from HuggingFace | |
| agents = load_agents_from_hf() | |
| if not agents: | |
| print("No agents found in HuggingFace dataset") | |
| return | |
| # Extract all identifiers | |
| identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')] | |
| if not identifiers: | |
| print("No valid agent identifiers found") | |
| return | |
| print(f"{'='*80}") | |
| print(f"Starting review metadata mining for {len(identifiers)} agents") | |
| print(f"Time frame: Last {LEADERBOARD_TIME_FRAME_DAYS} days") | |
| print(f"{'='*80}") | |
| # Initialize DuckDB connection | |
| try: | |
| conn = get_duckdb_connection() | |
| except Exception as e: | |
| print(f"Failed to initialize DuckDB connection: {str(e)}") | |
| return | |
| # Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today) | |
| current_time = datetime.now(timezone.utc) | |
| end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0) | |
| start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS) | |
| try: | |
| # Use single query for all agents | |
| all_metadata = fetch_all_pr_metadata_single_query( | |
| conn, identifiers, start_date, end_date | |
| ) | |
| # Calculate summary statistics | |
| total_prs = sum(len(metadata_list) for metadata_list in all_metadata.values()) | |
| agents_with_data = sum(1 for metadata_list in all_metadata.values() if metadata_list) | |
| print(f"\n{'='*80}") | |
| print(f"DuckDB query complete!") | |
| print(f" Total agents: {len(agents)}") | |
| print(f" Agents with data: {agents_with_data}") | |
| print(f" Total PRs found: {total_prs}") | |
| print(f"{'='*80}") | |
| except Exception as e: | |
| print(f"Error during DuckDB fetch: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return | |
| finally: | |
| # Close DuckDB connection | |
| conn.close() | |
| # Batch upload review metadata with time gaps | |
| success_count, error_count = batch_upload_review_metadata(all_metadata) | |
| # Construct and save leaderboard data | |
| print(f"{'='*80}") | |
| print(f"Constructing and saving leaderboard data...") | |
| print(f"{'='*80}\n") | |
| try: | |
| # Construct leaderboard from in-memory data | |
| leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents) | |
| # Calculate monthly metrics from in-memory data | |
| print(f"Calculating monthly metrics...") | |
| monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents) | |
| # Save to HuggingFace | |
| print(f"Saving leaderboard data to HuggingFace...") | |
| save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics) | |
| print(f"\n{'='*80}") | |
| print(f"ALL TASKS COMPLETE!") | |
| print(f" Review metadata: {success_count} files uploaded, {error_count} errors") | |
| print(f" Leaderboard entries: {len(leaderboard_dict)}") | |
| print(f" Monthly data points: {len(monthly_metrics.get('months', []))} months") | |
| print(f" Saved to: {LEADERBOARD_REPO}/swe-review.json") | |
| print(f"{'='*80}") | |
| except Exception as e: | |
| print(f"Failed to construct/save leaderboard data: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| # ============================================================================= | |
| # ENTRY POINT | |
| # ============================================================================= | |
| if __name__ == "__main__": | |
| mine_all_agents() | |