SWE-Review / msr.py
zhimin-z
refine
fc294eb
raw
history blame
37.7 kB
import json
import os
import time
import tempfile
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.errors import HfHubHTTPError
from dotenv import load_dotenv
import duckdb
import backoff
import requests.exceptions
# Load environment variables
load_dotenv()
# =============================================================================
# CONFIGURATION
# =============================================================================
AGENTS_REPO = "SWE-Arena/bot_metadata"
REVIEW_METADATA_REPO = "SWE-Arena/review_metadata"
LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata" # HuggingFace dataset for leaderboard data
LEADERBOARD_TIME_FRAME_DAYS = 180 # Time frame for leaderboard
GHARCHIVE_DATA_DIR = "../gharchive/data" # Local GHArchive data directory
DUCKDB_CACHE_FILE = "gharchive_cache.duckdb" # Persistent DuckDB database for caching
# Upload configuration
UPLOAD_DELAY_SECONDS = 2 # Delay between individual file uploads to avoid rate limits
MAX_RETRIES = 5 # Maximum number of retries for each upload
INITIAL_BACKOFF = 60 # Initial backoff time in seconds (1 minute)
MAX_BACKOFF = 3600 # Maximum backoff time in seconds (60 minutes)
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def load_jsonl(filename):
"""Load JSONL file and return list of dictionaries."""
if not os.path.exists(filename):
return []
data = []
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
data.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Warning: Skipping invalid JSON line: {e}")
return data
def save_jsonl(filename, data):
"""Save list of dictionaries to JSONL file."""
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item) + '\n')
def normalize_date_format(date_string):
"""
Convert date strings to standardized ISO 8601 format with Z suffix.
Handles both 'T' and space-separated datetime formats (including newlines).
Examples:
- 2025-10-15T23:23:47.983068 -> 2025-10-15T23:23:47Z
- 2025-06-17 21:21:07+00 -> 2025-06-17T21:21:07Z
"""
if not date_string or date_string == 'N/A':
return 'N/A'
try:
import re
# Remove all whitespace (spaces, newlines, tabs) and replace with single space
date_string = re.sub(r'\s+', ' ', date_string.strip())
# Replace space with 'T' for ISO format compatibility
date_string = date_string.replace(' ', 'T')
# Fix incomplete timezone offset (+00 or -00 -> +00:00 or -00:00)
# Check if timezone offset exists and is incomplete
if len(date_string) >= 3:
if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
date_string = date_string + ':00'
# Parse the date string (handles both with and without microseconds)
dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
# Convert to standardized format
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
except Exception as e:
print(f"Warning: Could not parse date '{date_string}': {e}")
return date_string
def get_hf_token():
"""Get HuggingFace token from environment variables."""
token = os.getenv('HF_TOKEN')
if not token:
print("Warning: HF_TOKEN not found in environment variables")
return token
# =============================================================================
# HUGGINGFACE API WRAPPERS WITH ENHANCED BACKOFF
# =============================================================================
def is_retryable_error(e):
"""
Check if exception is retryable (rate limit or timeout error).
"""
# Check for rate limit error (429)
if isinstance(e, HfHubHTTPError):
if e.response.status_code == 429:
return True
# Check for timeout errors
if isinstance(e, (requests.exceptions.Timeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectTimeout)):
return True
# Check if it's a timeout error wrapped in HfHubHTTPError
if isinstance(e, Exception):
error_str = str(e).lower()
if 'timeout' in error_str or 'timed out' in error_str:
return True
return False
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=8,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/8..."
)
)
def list_repo_files_with_backoff(api, **kwargs):
"""Wrapper for api.list_repo_files() with exponential backoff for retryable errors."""
return api.list_repo_files(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=8,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/8..."
)
)
def hf_hub_download_with_backoff(**kwargs):
"""Wrapper for hf_hub_download() with exponential backoff for retryable errors."""
return hf_hub_download(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=8,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/8..."
)
)
def upload_file_with_backoff(api, **kwargs):
"""Wrapper for api.upload_file() with exponential backoff for retryable errors."""
return api.upload_file(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=8,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/8..."
)
)
def upload_folder_with_backoff(api, **kwargs):
"""Wrapper for api.upload_folder() with exponential backoff for retryable errors."""
return api.upload_folder(**kwargs)
def get_duckdb_connection():
"""
Initialize DuckDB connection with persistent database and optimized parallelization.
Returns:
DuckDB connection object
"""
# Use persistent database for caching results
conn = duckdb.connect(DUCKDB_CACHE_FILE)
# Optimize for 96-core CPU parallelization with 754GB RAM
conn.execute("SET threads TO 48;") # Use all available cores
conn.execute("SET memory_limit = '400GB';") # Utilize available RAM (709GB available)
conn.execute("SET preserve_insertion_order = false;") # Better parallelization
conn.execute("SET enable_object_cache = true;") # Cache objects for reuse
conn.execute("SET temp_directory = '/tmp/duckdb_temp';") # Use fast temp storage if needed
return conn
def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR):
"""
Generate file path patterns for GHArchive data in date range.
Args:
start_date: Start datetime
end_date: End datetime
data_dir: Directory containing GHArchive data files
Returns:
List of file path patterns (one per day)
"""
file_patterns = []
current_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
while current_date <= end_day:
# Pattern for daily parquet file: 2024-11-15.parquet
pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}.parquet")
file_patterns.append(pattern)
# Move to next day
current_date += timedelta(days=1)
return file_patterns
# =============================================================================
# DUCKDB QUERY FUNCTIONS
# =============================================================================
def fetch_all_pr_metadata_single_query(conn, identifiers, start_date, end_date):
"""
Fetch PR review metadata for ALL agents using ONE comprehensive DuckDB query.
This query combines:
1. Review events (PullRequestReviewEvent) for all agents
2. PR status (PullRequestEvent with action='closed')
Args:
conn: DuckDB connection instance
identifiers: List of GitHub usernames/bot identifiers
start_date: Start datetime (timezone-aware)
end_date: End datetime (timezone-aware)
Returns:
Dictionary mapping agent identifier to list of PR metadata:
{
'agent-identifier': [
{
'url': PR URL,
'reviewed_at': Review timestamp,
'merged_at': Merge timestamp (if merged, else None),
'closed_at': Close timestamp (if closed, else None)
},
...
],
...
}
"""
print(f"Querying DuckDB for ALL {len(identifiers)} agents in ONE QUERY")
print(f" Time range: {start_date.strftime('%Y-%m-%d')} to {end_date.strftime('%Y-%m-%d')}")
# Generate file path patterns for review period
review_patterns = generate_file_path_patterns(start_date, end_date)
# Generate file path patterns for PR status (use same lookback as reviews)
status_start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
status_patterns = generate_file_path_patterns(status_start_date, end_date)
# Build identifier list for IN clause
identifier_list = ', '.join([f"'{id}'" for id in identifiers])
# Build comprehensive query with CTEs using parameterized file lists (Parquet optimized)
query = f"""
WITH review_events AS (
-- Get all review events for ALL agents
SELECT
payload.pull_request.html_url as url,
COALESCE(
payload.review.submitted_at,
CAST(created_at AS VARCHAR)
) as reviewed_at,
actor.login as reviewer,
repo.name as repo_name,
CAST(payload.pull_request.number AS INTEGER) as pr_number
FROM read_parquet($review_patterns, union_by_name=true, filename=true)
WHERE
type = 'PullRequestReviewEvent'
AND actor.login IN ({identifier_list})
AND payload.pull_request.html_url IS NOT NULL
UNION ALL
-- Get PR comments (IssueCommentEvent on PRs)
SELECT
payload.issue.html_url as url,
CAST(created_at AS VARCHAR) as reviewed_at,
actor.login as reviewer,
repo.name as repo_name,
CAST(payload.issue.number AS INTEGER) as pr_number
FROM read_parquet($review_patterns, union_by_name=true, filename=true)
WHERE
type = 'IssueCommentEvent'
AND actor.login IN ({identifier_list})
AND payload.issue.pull_request.url IS NOT NULL
AND payload.issue.html_url IS NOT NULL
UNION ALL
-- Get review comments (PullRequestReviewCommentEvent)
SELECT
payload.pull_request.html_url as url,
CAST(created_at AS VARCHAR) as reviewed_at,
actor.login as reviewer,
repo.name as repo_name,
CAST(payload.pull_request.number AS INTEGER) as pr_number
FROM read_parquet($review_patterns, union_by_name=true, filename=true)
WHERE
type = 'PullRequestReviewCommentEvent'
AND actor.login IN ({identifier_list})
AND payload.pull_request.html_url IS NOT NULL
),
pr_status AS (
-- Get merge/close status for those PRs
SELECT
payload.pull_request.html_url as url,
CAST(payload.pull_request.merged AS BOOLEAN) as is_merged,
payload.pull_request.merged_at as merged_at,
payload.pull_request.closed_at as closed_at,
created_at,
ROW_NUMBER() OVER (PARTITION BY payload.pull_request.html_url ORDER BY created_at DESC) as rn
FROM read_parquet($status_patterns, union_by_name=true, filename=true)
WHERE
type = 'PullRequestEvent'
AND payload.action = 'closed'
AND payload.pull_request.html_url IS NOT NULL
AND payload.pull_request.html_url IN (
SELECT DISTINCT url FROM review_events
)
)
-- Join review events with PR status
SELECT DISTINCT
re.reviewer,
re.url,
re.reviewed_at,
ps.merged_at,
ps.closed_at
FROM review_events re
LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.url = ps.url
ORDER BY re.reviewer, re.reviewed_at DESC
"""
# Calculate number of days for reporting
review_days = (end_date - start_date).days
status_days = (end_date - status_start_date).days
print(f" Querying {review_days} days for reviews, {status_days} days for PR status...")
print(f" Agents: {', '.join(identifiers[:5])}{'...' if len(identifiers) > 5 else ''}")
try:
# Create cache table name based on date range
cache_table_name = f"pr_cache_{start_date.strftime('%Y%m%d')}_{end_date.strftime('%Y%m%d')}"
# Check if cache exists and is valid
cache_exists = conn.execute(f"""
SELECT COUNT(*) FROM information_schema.tables
WHERE table_name = '{cache_table_name}'
""").fetchone()[0] > 0
if cache_exists:
print(f" Using cached results from table {cache_table_name}")
results = conn.execute(f"""
SELECT reviewer, url, reviewed_at, merged_at, closed_at
FROM {cache_table_name}
WHERE reviewer IN ({identifier_list})
""").fetchall()
else:
print(f" Cache miss - executing full query and caching to {cache_table_name}")
# Execute query with parameters
results = conn.execute(query, {'review_patterns': review_patterns, 'status_patterns': status_patterns}).fetchall()
# Cache the complete results for all future queries in this date range
if len(results) > 0:
conn.execute(f"""
CREATE TABLE {cache_table_name} AS
SELECT * FROM (
SELECT UNNEST($1) as reviewer, UNNEST($2) as url,
UNNEST($3) as reviewed_at, UNNEST($4) as merged_at,
UNNEST($5) as closed_at
)
""", [
[r[0] for r in results],
[r[1] for r in results],
[r[2] for r in results],
[r[3] for r in results],
[r[4] for r in results]
])
print(f" Cached {len(results)} results to {cache_table_name}")
print(f" Found {len(results)} total PR review records across all agents")
# Group results by agent
metadata_by_agent = defaultdict(list)
for row in results:
reviewer = row[0]
url = row[1]
reviewed_at = normalize_date_format(row[2]) if row[2] else None
merged_at = normalize_date_format(row[3]) if row[3] else None
closed_at = normalize_date_format(row[4]) if row[4] else None
metadata_by_agent[reviewer].append({
'url': url,
'reviewed_at': reviewed_at,
'merged_at': merged_at,
'closed_at': closed_at,
})
# Print breakdown by agent
print(f"Results breakdown by agent:")
for identifier in identifiers:
count = len(metadata_by_agent.get(identifier, []))
if count > 0:
metadata = metadata_by_agent[identifier]
merged_count = sum(1 for m in metadata if m['merged_at'] is not None)
closed_count = sum(1 for m in metadata if m['closed_at'] is not None and m['merged_at'] is None)
open_count = count - merged_count - closed_count
print(f" {identifier}: {count} PRs ({merged_count} merged, {closed_count} closed, {open_count} open)")
# Convert defaultdict to regular dict
return dict(metadata_by_agent)
except Exception as e:
print(f" DuckDB error: {str(e)}")
import traceback
traceback.print_exc()
return {}
# =============================================================================
# HUGGINGFACE STORAGE FUNCTIONS WITH BATCH UPLOAD
# =============================================================================
def group_metadata_by_date(metadata_list):
"""
Group review metadata by date (year.month.day) for daily storage.
Returns dict: {(year, month, day): [metadata_list]}
"""
grouped = defaultdict(list)
for review_meta in metadata_list:
reviewed_at = review_meta.get('reviewed_at')
if not reviewed_at:
continue
try:
dt = datetime.fromisoformat(reviewed_at.replace('Z', '+00:00'))
key = (dt.year, dt.month, dt.day)
grouped[key].append(review_meta)
except Exception as e:
print(f"Warning: Could not parse date '{reviewed_at}': {e}")
return dict(grouped)
def upload_single_file_with_retry(api, local_path, repo_path, repo_id, repo_type, commit_message, max_retries=MAX_RETRIES):
"""
Upload a single file with exponential backoff retry logic.
Args:
api: HfApi instance
local_path: Local file path
repo_path: Path in repository
repo_id: Repository ID
repo_type: Repository type (e.g., "dataset")
commit_message: Commit message
max_retries: Maximum number of retries
Returns:
bool: True if successful, False otherwise
"""
for attempt in range(max_retries):
try:
upload_file_with_backoff(
api=api,
path_or_fileobj=local_path,
path_in_repo=repo_path,
repo_id=repo_id,
repo_type=repo_type,
commit_message=commit_message
)
return True
except Exception as e:
if attempt < max_retries - 1:
# Calculate exponential backoff
wait_time = min(INITIAL_BACKOFF * (2 ** attempt), MAX_BACKOFF)
print(f" {e} error on attempt {attempt + 1}/{max_retries}. Retrying in {wait_time}s...")
time.sleep(wait_time)
else:
print(f" Failed after {max_retries} attempts: {str(e)}")
return False
return False
def batch_upload_review_metadata(all_metadata):
"""
Upload review metadata for all agents with time gaps between uploads.
Each agent's data is uploaded as separate daily files with retry logic.
Args:
all_metadata: Dictionary mapping agent identifier to list of PR metadata
Returns:
tuple: (success_count, error_count)
"""
try:
token = get_hf_token()
if not token:
raise Exception("No HuggingFace token found")
api = HfApi(token=token)
success_count = 0
error_count = 0
total_files = 0
# First, calculate total number of files to upload
for agent_identifier, metadata_list in all_metadata.items():
if metadata_list:
grouped = group_metadata_by_date(metadata_list)
total_files += len(grouped)
print(f"\n{'='*80}")
print(f"Starting batch upload: {len(all_metadata)} agents, {total_files} total files")
print(f"Upload delay: {UPLOAD_DELAY_SECONDS}s between files")
print(f"{'='*80}\n")
file_count = 0
for agent_idx, (agent_identifier, metadata_list) in enumerate(all_metadata.items(), 1):
if not metadata_list:
print(f"[{agent_idx}/{len(all_metadata)}] Skipping {agent_identifier} (no data)")
continue
# Group by date
grouped = group_metadata_by_date(metadata_list)
print(f"[{agent_idx}/{len(all_metadata)}] Uploading {len(grouped)} files for {agent_identifier}...")
# Create temporary files for this agent
agent_temp_dir = tempfile.mkdtemp()
try:
# Prepare all files locally
local_files = []
for (review_year, month, day), day_metadata in grouped.items():
filename = f"{review_year}.{month:02d}.{day:02d}.jsonl"
local_path = os.path.join(agent_temp_dir, filename)
repo_path = f"{agent_identifier}/{filename}"
# Sort by reviewed_at for better organization
day_metadata.sort(key=lambda x: x.get('reviewed_at', ''), reverse=True)
# Save to temp file
save_jsonl(local_path, day_metadata)
local_files.append((local_path, repo_path, len(day_metadata)))
# Upload each file with delay
agent_success = 0
agent_error = 0
for file_idx, (local_path, repo_path, review_count) in enumerate(local_files, 1):
file_count += 1
print(f" [{file_count}/{total_files}] Uploading {repo_path} ({review_count} reviews)...", end='')
if upload_single_file_with_retry(
api=api,
local_path=local_path,
repo_path=repo_path,
repo_id=REVIEW_METADATA_REPO,
repo_type="dataset",
commit_message=f"Update {repo_path}",
max_retries=MAX_RETRIES
):
print(" ")
agent_success += 1
success_count += 1
else:
print(" ")
agent_error += 1
error_count += 1
# Add delay between uploads (except for last file)
if file_idx < len(local_files):
time.sleep(UPLOAD_DELAY_SECONDS)
print(f" Agent {agent_identifier}: {agent_success} uploaded, {agent_error} errors\n")
finally:
# Clean up temp directory
if os.path.exists(agent_temp_dir):
import shutil
shutil.rmtree(agent_temp_dir)
print(f"\n{'='*80}")
print(f"Batch upload complete!")
print(f" Total files: {total_files}")
print(f" Successful: {success_count}")
print(f" Errors: {error_count}")
print(f"{'='*80}\n")
return success_count, error_count
except Exception as e:
print(f"Error during batch upload: {str(e)}")
import traceback
traceback.print_exc()
return 0, total_files if 'total_files' in locals() else 0
def load_agents_from_hf():
"""
Load all agent metadata JSON files from HuggingFace dataset.
The github_identifier is extracted from the filename (e.g., 'agent-name[bot].json' -> 'agent-name[bot]')
"""
try:
api = HfApi()
agents = []
# List all files in the repository
files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
# Filter for JSON files only
json_files = [f for f in files if f.endswith('.json')]
print(f"Found {len(json_files)} agent files in {AGENTS_REPO}")
# Download and parse each JSON file
for json_file in json_files:
try:
file_path = hf_hub_download_with_backoff(
repo_id=AGENTS_REPO,
filename=json_file,
repo_type="dataset"
)
with open(file_path, 'r') as f:
agent_data = json.load(f)
# Only process agents with status == "public"
if agent_data.get('status') != 'public':
continue
# Extract github_identifier from filename (remove .json extension)
github_identifier = json_file.replace('.json', '')
agent_data['github_identifier'] = github_identifier
agents.append(agent_data)
except Exception as e:
print(f"Warning: Could not load {json_file}: {str(e)}")
continue
print(f"Loaded {len(agents)} agents from HuggingFace")
return agents
except Exception as e:
print(f"Could not load agents from HuggingFace: {str(e)}")
return []
def get_pr_status_from_metadata(review_meta):
"""
Derive PR status from merged_at and closed_at fields.
Returns:
str: 'merged', 'closed', or 'open'
"""
merged_at = review_meta.get('merged_at')
closed_at = review_meta.get('closed_at')
if merged_at:
return 'merged'
elif closed_at:
return 'closed'
else:
return 'open'
def calculate_review_stats_from_metadata(metadata_list):
"""
Calculate statistics from a list of review metadata.
Returns:
Dictionary with review metrics (total_reviews, merged_prs, acceptance_rate, etc.)
"""
total_reviews = len(metadata_list)
# Count merged PRs
merged_prs = sum(1 for review_meta in metadata_list
if get_pr_status_from_metadata(review_meta) == 'merged')
# Count rejected PRs
rejected_prs = sum(1 for review_meta in metadata_list
if get_pr_status_from_metadata(review_meta) == 'closed')
# Count pending PRs
pending_prs = sum(1 for review_meta in metadata_list
if get_pr_status_from_metadata(review_meta) == 'open')
# Calculate acceptance rate (exclude pending PRs)
completed_prs = merged_prs + rejected_prs
acceptance_rate = (merged_prs / completed_prs * 100) if completed_prs > 0 else 0
return {
'total_reviews': total_reviews,
'merged_prs': merged_prs,
'pending_prs': pending_prs,
'acceptance_rate': round(acceptance_rate, 2),
}
def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
"""
Calculate monthly metrics for all agents for visualization.
Args:
all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata
agents: List of agent dictionaries with metadata
Returns:
dict: {
'agents': list of agent names,
'months': list of month labels (e.g., '2025-01'),
'data': {
agent_name: {
'acceptance_rates': list of acceptance rates by month,
'total_reviews': list of review counts by month,
'merged_prs': list of merged PR counts by month,
}
}
}
"""
# Create mapping from agent_identifier to agent_name
identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')}
if not all_metadata_dict:
return {'agents': [], 'months': [], 'data': {}}
# Group by agent and month
agent_month_data = defaultdict(lambda: defaultdict(list))
# Flatten the dict of lists into a single list with agent_identifier added
for agent_identifier, metadata_list in all_metadata_dict.items():
for review_meta in metadata_list:
reviewed_at = review_meta.get('reviewed_at')
if not reviewed_at:
continue
# Get agent_name from identifier
agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
try:
dt = datetime.fromisoformat(reviewed_at.replace('Z', '+00:00'))
month_key = f"{dt.year}-{dt.month:02d}"
agent_month_data[agent_name][month_key].append(review_meta)
except Exception as e:
print(f"Warning: Could not parse date '{reviewed_at}': {e}")
continue
# Get all unique months and sort them
all_months = set()
for agent_data in agent_month_data.values():
all_months.update(agent_data.keys())
months = sorted(list(all_months))
# Calculate metrics for each agent and month
result_data = {}
for agent_name, month_dict in agent_month_data.items():
acceptance_rates = []
total_reviews_list = []
merged_prs_list = []
for month in months:
reviews_in_month = month_dict.get(month, [])
# Count merged PRs
merged_count = sum(1 for review in reviews_in_month
if get_pr_status_from_metadata(review) == 'merged')
# Count rejected PRs
rejected_count = sum(1 for review in reviews_in_month
if get_pr_status_from_metadata(review) == 'closed')
# Total reviews
total_count = len(reviews_in_month)
# Calculate acceptance rate (exclude pending PRs)
completed_count = merged_count + rejected_count
acceptance_rate = (merged_count / completed_count * 100) if completed_count > 0 else None
acceptance_rates.append(acceptance_rate)
total_reviews_list.append(total_count)
merged_prs_list.append(merged_count)
result_data[agent_name] = {
'acceptance_rates': acceptance_rates,
'total_reviews': total_reviews_list,
'merged_prs': merged_prs_list,
}
agents_list = sorted(list(agent_month_data.keys()))
return {
'agents': agents_list,
'months': months,
'data': result_data
}
def construct_leaderboard_from_metadata(all_metadata_dict, agents):
"""
Construct leaderboard from in-memory review metadata.
Args:
all_metadata_dict: Dictionary mapping agent identifier to list of PR metadata
agents: List of agent dictionaries with metadata
Returns:
Dictionary of agent stats.
"""
print("Constructing leaderboard from review metadata...")
if not agents:
print("No agents found")
return {}
print(f"Processing {len(agents)} agents")
cache_dict = {}
for agent in agents:
identifier = agent.get('github_identifier')
agent_name = agent.get('name', 'Unknown')
# Get metadata for this agent from the dictionary
bot_metadata = all_metadata_dict.get(identifier, [])
# Calculate stats
stats = calculate_review_stats_from_metadata(bot_metadata)
cache_dict[identifier] = {
'name': agent_name,
'website': agent.get('website', 'N/A'),
'github_identifier': identifier,
**stats
}
print(f"Constructed cache with {len(cache_dict)} agent entries")
return cache_dict
def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
"""
Save leaderboard data and monthly metrics to HuggingFace dataset as swe-review.json.
Args:
leaderboard_dict: Dictionary of agent stats from construct_leaderboard_from_metadata()
monthly_metrics: Monthly metrics data from calculate_monthly_metrics_by_agent()
Returns:
bool: True if successful, False otherwise
"""
try:
token = get_hf_token()
if not token:
raise Exception("No HuggingFace token found")
api = HfApi(token=token)
filename = "swe-review.json"
# Combine leaderboard and monthly metrics
combined_data = {
'last_updated': datetime.now(timezone.utc).isoformat(),
'leaderboard': leaderboard_dict,
'monthly_metrics': monthly_metrics,
'metadata': {
'leaderboard_time_frame_days': LEADERBOARD_TIME_FRAME_DAYS
}
}
# Save locally first
with open(filename, 'w') as f:
json.dump(combined_data, f, indent=2)
try:
# Upload to HuggingFace with retry logic
print(f"Uploading leaderboard data...", end='')
upload_file_with_backoff(
api=api,
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=LEADERBOARD_REPO,
repo_type="dataset"
)
print(" ")
print(f"Saved leaderboard data to HuggingFace: {filename}")
return True
finally:
# Always clean up local file
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
print(f" ")
print(f"Error saving leaderboard data: {str(e)}")
import traceback
traceback.print_exc()
return False
# =============================================================================
# MAIN MINING FUNCTION
# =============================================================================
def mine_all_agents():
"""
Mine review metadata for all agents within LEADERBOARD_TIME_FRAME_DAYS and save to HuggingFace.
Uses ONE DuckDB query for ALL agents, then batch uploads with time gaps.
"""
# Load agent metadata from HuggingFace
agents = load_agents_from_hf()
if not agents:
print("No agents found in HuggingFace dataset")
return
# Extract all identifiers
identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
if not identifiers:
print("No valid agent identifiers found")
return
print(f"{'='*80}")
print(f"Starting review metadata mining for {len(identifiers)} agents")
print(f"Time frame: Last {LEADERBOARD_TIME_FRAME_DAYS} days")
print(f"{'='*80}")
# Initialize DuckDB connection
try:
conn = get_duckdb_connection()
except Exception as e:
print(f"Failed to initialize DuckDB connection: {str(e)}")
return
# Define time range: past LEADERBOARD_TIME_FRAME_DAYS (excluding today)
current_time = datetime.now(timezone.utc)
end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
try:
# Use single query for all agents
all_metadata = fetch_all_pr_metadata_single_query(
conn, identifiers, start_date, end_date
)
# Calculate summary statistics
total_prs = sum(len(metadata_list) for metadata_list in all_metadata.values())
agents_with_data = sum(1 for metadata_list in all_metadata.values() if metadata_list)
print(f"\n{'='*80}")
print(f"DuckDB query complete!")
print(f" Total agents: {len(agents)}")
print(f" Agents with data: {agents_with_data}")
print(f" Total PRs found: {total_prs}")
print(f"{'='*80}")
except Exception as e:
print(f"Error during DuckDB fetch: {str(e)}")
import traceback
traceback.print_exc()
return
finally:
# Close DuckDB connection
conn.close()
# Batch upload review metadata with time gaps
success_count, error_count = batch_upload_review_metadata(all_metadata)
# Construct and save leaderboard data
print(f"{'='*80}")
print(f"Constructing and saving leaderboard data...")
print(f"{'='*80}\n")
try:
# Construct leaderboard from in-memory data
leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents)
# Calculate monthly metrics from in-memory data
print(f"Calculating monthly metrics...")
monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents)
# Save to HuggingFace
print(f"Saving leaderboard data to HuggingFace...")
save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics)
print(f"\n{'='*80}")
print(f"ALL TASKS COMPLETE!")
print(f" Review metadata: {success_count} files uploaded, {error_count} errors")
print(f" Leaderboard entries: {len(leaderboard_dict)}")
print(f" Monthly data points: {len(monthly_metrics.get('months', []))} months")
print(f" Saved to: {LEADERBOARD_REPO}/swe-review.json")
print(f"{'='*80}")
except Exception as e:
print(f"Failed to construct/save leaderboard data: {str(e)}")
import traceback
traceback.print_exc()
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
mine_all_agents()