SWE-Review / msr.py
zhimin-z
remove 5/5
f670436
raw
history blame
27 kB
import json
import os
import time
from datetime import datetime, timezone, timedelta
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from huggingface_hub import HfApi, hf_hub_download
from huggingface_hub.errors import HfHubHTTPError
from dotenv import load_dotenv
import duckdb
import backoff
import requests
import requests.exceptions
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
import logging
# Load environment variables
load_dotenv()
# =============================================================================
# CONFIGURATION
# =============================================================================
AGENTS_REPO = "SWE-Arena/bot_metadata"
LEADERBOARD_REPO = "SWE-Arena/leaderboard_metadata"
LEADERBOARD_TIME_FRAME_DAYS = 180
GHARCHIVE_DATA_DIR = "../gharchive/data"
DUCKDB_CACHE_FILE = "cache.duckdb"
# OPTIMIZED DUCKDB CONFIGURATION
DUCKDB_THREADS = 8
DUCKDB_MEMORY_LIMIT = "64GB"
# Streaming batch configuration
BATCH_SIZE_DAYS = 7 # Process 1 week at a time (~168 hourly files)
# At this size: ~7 days × 24 files × ~100MB per file = ~16GB uncompressed per batch
# Download configuration
DOWNLOAD_WORKERS = 4
DOWNLOAD_RETRY_DELAY = 2
MAX_RETRIES = 5
# Upload configuration
UPLOAD_DELAY_SECONDS = 5
UPLOAD_INITIAL_BACKOFF = 60
UPLOAD_MAX_BACKOFF = 3600
# Scheduler configuration
SCHEDULE_ENABLED = False
SCHEDULE_DAY_OF_MONTH = 22
SCHEDULE_HOUR = 0
SCHEDULE_MINUTE = 0
SCHEDULE_TIMEZONE = 'UTC'
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def load_jsonl(filename):
"""Load JSONL file and return list of dictionaries."""
if not os.path.exists(filename):
return []
data = []
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line:
try:
data.append(json.loads(line))
except json.JSONDecodeError as e:
print(f"Warning: Skipping invalid JSON line: {e}")
return data
def save_jsonl(filename, data):
"""Save list of dictionaries to JSONL file."""
with open(filename, 'w', encoding='utf-8') as f:
for item in data:
f.write(json.dumps(item) + '\n')
def normalize_date_format(date_string):
"""Convert date strings or datetime objects to standardized ISO 8601 format with Z suffix."""
if not date_string or date_string == 'N/A':
return 'N/A'
try:
import re
if isinstance(date_string, datetime):
return date_string.strftime('%Y-%m-%dT%H:%M:%SZ')
date_string = re.sub(r'\s+', ' ', date_string.strip())
date_string = date_string.replace(' ', 'T')
if len(date_string) >= 3:
if date_string[-3:-2] in ('+', '-') and ':' not in date_string[-3:]:
date_string = date_string + ':00'
dt = datetime.fromisoformat(date_string.replace('Z', '+00:00'))
return dt.strftime('%Y-%m-%dT%H:%M:%SZ')
except Exception as e:
print(f"Warning: Could not parse date '{date_string}': {e}")
return date_string
def get_hf_token():
"""Get HuggingFace token from environment variables."""
token = os.getenv('HF_TOKEN')
if not token:
print("Warning: HF_TOKEN not found in environment variables")
return token
# =============================================================================
# GHARCHIVE DOWNLOAD FUNCTIONS
# =============================================================================
def download_file(url):
"""Download a GHArchive file with retry logic."""
filename = url.split("/")[-1]
filepath = os.path.join(GHARCHIVE_DATA_DIR, filename)
if os.path.exists(filepath):
return True
for attempt in range(MAX_RETRIES):
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
with open(filepath, "wb") as f:
f.write(response.content)
return True
except Exception as e:
wait_time = DOWNLOAD_RETRY_DELAY * (2 ** attempt)
print(f" ⚠ {filename}: {e}, retrying in {wait_time}s (attempt {attempt + 1}/{MAX_RETRIES})")
time.sleep(wait_time)
return False
def download_all_gharchive_data():
"""Download all GHArchive data files for the last LEADERBOARD_TIME_FRAME_DAYS."""
os.makedirs(GHARCHIVE_DATA_DIR, exist_ok=True)
end_date = datetime.now()
start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
urls = []
current_date = start_date
while current_date <= end_date:
date_str = current_date.strftime("%Y-%m-%d")
for hour in range(24):
url = f"https://data.gharchive.org/{date_str}-{hour}.json.gz"
urls.append(url)
current_date += timedelta(days=1)
downloads_processed = 0
try:
with ThreadPoolExecutor(max_workers=DOWNLOAD_WORKERS) as executor:
futures = [executor.submit(download_file, url) for url in urls]
for future in as_completed(futures):
downloads_processed += 1
print(f"Download complete: {downloads_processed} files")
return True
except Exception as e:
print(f"Error during download: {str(e)}")
import traceback
traceback.print_exc()
return False
# =============================================================================
# HUGGINGFACE API WRAPPERS
# =============================================================================
def is_retryable_error(e):
"""Check if exception is retryable (rate limit or timeout error)."""
if isinstance(e, HfHubHTTPError):
if e.response.status_code == 429:
return True
if isinstance(e, (requests.exceptions.Timeout,
requests.exceptions.ReadTimeout,
requests.exceptions.ConnectTimeout)):
return True
if isinstance(e, Exception):
error_str = str(e).lower()
if 'timeout' in error_str or 'timed out' in error_str:
return True
return False
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def list_repo_files_with_backoff(api, **kwargs):
"""Wrapper for api.list_repo_files() with exponential backoff."""
return api.list_repo_files(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def hf_hub_download_with_backoff(**kwargs):
"""Wrapper for hf_hub_download() with exponential backoff."""
return hf_hub_download(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def upload_file_with_backoff(api, **kwargs):
"""Wrapper for api.upload_file() with exponential backoff."""
return api.upload_file(**kwargs)
@backoff.on_exception(
backoff.expo,
(HfHubHTTPError, requests.exceptions.Timeout, requests.exceptions.RequestException, Exception),
max_tries=MAX_RETRIES,
base=300,
max_value=3600,
giveup=lambda e: not is_retryable_error(e),
on_backoff=lambda details: print(
f" {details['exception']} error. Retrying in {details['wait']/60:.1f} minutes ({details['wait']:.0f}s) - attempt {details['tries']}/5..."
)
)
def upload_folder_with_backoff(api, **kwargs):
"""Wrapper for api.upload_folder() with exponential backoff."""
return api.upload_folder(**kwargs)
def get_duckdb_connection():
"""
Initialize DuckDB connection with OPTIMIZED memory settings.
Uses persistent database and reduced memory footprint.
"""
conn = duckdb.connect(DUCKDB_CACHE_FILE)
# OPTIMIZED SETTINGS
conn.execute(f"SET threads TO {DUCKDB_THREADS};")
conn.execute("SET preserve_insertion_order = false;")
conn.execute("SET enable_object_cache = true;")
conn.execute("SET temp_directory = '/tmp/duckdb_temp';")
conn.execute(f"SET memory_limit = '{DUCKDB_MEMORY_LIMIT}';")
conn.execute(f"SET max_memory = '{DUCKDB_MEMORY_LIMIT}';")
return conn
def generate_file_path_patterns(start_date, end_date, data_dir=GHARCHIVE_DATA_DIR):
"""Generate file path patterns for GHArchive data in date range (only existing files)."""
file_patterns = []
missing_dates = set()
current_date = start_date.replace(hour=0, minute=0, second=0, microsecond=0)
end_day = end_date.replace(hour=0, minute=0, second=0, microsecond=0)
while current_date <= end_day:
date_has_files = False
for hour in range(24):
pattern = os.path.join(data_dir, f"{current_date.strftime('%Y-%m-%d')}-{hour}.json.gz")
if os.path.exists(pattern):
file_patterns.append(pattern)
date_has_files = True
if not date_has_files:
missing_dates.add(current_date.strftime('%Y-%m-%d'))
current_date += timedelta(days=1)
if missing_dates:
print(f" ⚠ Skipping {len(missing_dates)} date(s) with no data")
return file_patterns
# =============================================================================
# STREAMING BATCH PROCESSING FOR REVIEW METADATA
# =============================================================================
def fetch_all_review_metadata_streaming(conn, identifiers, start_date, end_date):
"""
OPTIMIZED: Fetch review metadata using streaming batch processing.
Processes GHArchive files in BATCH_SIZE_DAYS chunks to limit memory usage.
Instead of loading 180 days (4,344 files) at once, processes 7 days at a time.
This prevents OOM errors by:
1. Only keeping ~168 hourly files in memory per batch (vs 4,344)
2. Incrementally building the results dictionary
3. Allowing DuckDB to garbage collect after each batch
Args:
conn: DuckDB connection instance
identifiers: List of GitHub usernames/bot identifiers
start_date: Start datetime (timezone-aware)
end_date: End datetime (timezone-aware)
Returns:
Dictionary mapping agent identifier to list of review metadata
"""
identifier_list = ', '.join([f"'{id}'" for id in identifiers])
metadata_by_agent = defaultdict(list)
# Calculate total batches
total_days = (end_date - start_date).days
total_batches = (total_days // BATCH_SIZE_DAYS) + 1
# Process in configurable batches
current_date = start_date
batch_num = 0
total_reviews = 0
print(f" Streaming {total_batches} batches of {BATCH_SIZE_DAYS}-day intervals...")
while current_date <= end_date:
batch_num += 1
batch_end = min(current_date + timedelta(days=BATCH_SIZE_DAYS - 1), end_date)
# Get file patterns for THIS BATCH ONLY
file_patterns = generate_file_path_patterns(current_date, batch_end)
if not file_patterns:
print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} - NO DATA")
current_date = batch_end + timedelta(days=1)
continue
# Progress indicator
print(f" Batch {batch_num}/{total_batches}: {current_date.date()} to {batch_end.date()} ({len(file_patterns)} files)... ", end="", flush=True)
# Build file patterns SQL for THIS BATCH
file_patterns_sql = '[' + ', '.join([f"'{fp}'" for fp in file_patterns]) + ']'
# SIMPLIFIED query for review metadata
# Focuses on PullRequestReviewEvent and tracks PR status
query = f"""
WITH review_events AS (
SELECT
payload.pull_request.html_url as pr_url,
actor.login as reviewer,
COALESCE(payload.review.submitted_at, created_at) as reviewed_at
FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
WHERE
type = 'PullRequestReviewEvent'
AND payload.pull_request.html_url IS NOT NULL
AND actor.login IN ({identifier_list})
),
pr_status AS (
SELECT
payload.pull_request.html_url as pr_url,
payload.pull_request.merged as is_merged,
payload.pull_request.merged_at as merged_at,
payload.pull_request.closed_at as closed_at,
ROW_NUMBER() OVER (PARTITION BY payload.pull_request.html_url ORDER BY created_at DESC) as rn
FROM read_json({file_patterns_sql}, union_by_name=true, filename=true, compression='gzip', format='newline_delimited', ignore_errors=true, maximum_object_size=2147483648)
WHERE
type = 'PullRequestEvent'
AND payload.action = 'closed'
AND payload.pull_request.html_url IS NOT NULL
AND payload.pull_request.html_url IN (SELECT DISTINCT pr_url FROM review_events)
)
SELECT
re.reviewer,
re.pr_url as url,
re.reviewed_at,
ps.merged_at,
ps.closed_at
FROM review_events re
LEFT JOIN (SELECT * FROM pr_status WHERE rn = 1) ps ON re.pr_url = ps.pr_url
ORDER BY re.reviewer, re.reviewed_at DESC
"""
try:
results = conn.execute(query).fetchall()
batch_reviews = 0
# Add results to accumulating dictionary
for row in results:
reviewer = row[0]
url = row[1]
reviewed_at = normalize_date_format(row[2]) if row[2] else None
merged_at = normalize_date_format(row[3]) if row[3] else None
closed_at = normalize_date_format(row[4]) if row[4] else None
if not url or not reviewed_at:
continue
review_metadata = {
'url': url,
'reviewed_at': reviewed_at,
'merged_at': merged_at,
'closed_at': closed_at,
}
metadata_by_agent[reviewer].append(review_metadata)
batch_reviews += 1
total_reviews += 1
print(f"✓ {batch_reviews} reviews found")
except Exception as e:
print(f"\n ✗ Batch {batch_num} error: {str(e)}")
import traceback
traceback.print_exc()
# Move to next batch
current_date = batch_end + timedelta(days=1)
# Final summary
agents_with_data = sum(1 for reviews in metadata_by_agent.values() if reviews)
print(f"\n ✓ Complete: {total_reviews} reviews found for {agents_with_data}/{len(identifiers)} agents")
return dict(metadata_by_agent)
# =============================================================================
# HUGGINGFACE STORAGE FUNCTIONS
# =============================================================================
def load_agents_from_hf():
"""Load all agent metadata JSON files from HuggingFace dataset."""
try:
api = HfApi()
agents = []
files = list_repo_files_with_backoff(api=api, repo_id=AGENTS_REPO, repo_type="dataset")
json_files = [f for f in files if f.endswith('.json')]
for json_file in json_files:
try:
file_path = hf_hub_download_with_backoff(
repo_id=AGENTS_REPO,
filename=json_file,
repo_type="dataset"
)
with open(file_path, 'r') as f:
agent_data = json.load(f)
if agent_data.get('status') != 'public':
continue
github_identifier = json_file.replace('.json', '')
agent_data['github_identifier'] = github_identifier
agents.append(agent_data)
except Exception as e:
print(f"Error loading {json_file}: {str(e)}")
continue
print(f"Download complete: {len(agents)} agents")
return agents
except Exception as e:
print(f"Could not load agents from HuggingFace: {str(e)}")
return []
# =============================================================================
# STATISTICS CALCULATION
# =============================================================================
def get_pr_status_from_metadata(review_meta):
"""Derive PR status from merged_at and closed_at fields."""
merged_at = review_meta.get('merged_at')
closed_at = review_meta.get('closed_at')
if merged_at:
return 'merged'
elif closed_at:
return 'closed'
else:
return 'open'
def calculate_review_stats_from_metadata(metadata_list):
"""Calculate statistics from a list of review metadata."""
total_reviews = len(metadata_list)
merged_prs = sum(1 for review_meta in metadata_list
if get_pr_status_from_metadata(review_meta) == 'merged')
rejected_prs = sum(1 for review_meta in metadata_list
if get_pr_status_from_metadata(review_meta) == 'closed')
pending_prs = sum(1 for review_meta in metadata_list
if get_pr_status_from_metadata(review_meta) == 'open')
# Calculate acceptance rate (exclude pending PRs)
completed_prs = merged_prs + rejected_prs
acceptance_rate = (merged_prs / completed_prs * 100) if completed_prs > 0 else 0
return {
'total_reviews': total_reviews,
'merged_prs': merged_prs,
'pending_prs': pending_prs,
'acceptance_rate': round(acceptance_rate, 2),
}
def calculate_monthly_metrics_by_agent(all_metadata_dict, agents):
"""Calculate monthly metrics for all agents for visualization."""
identifier_to_name = {agent.get('github_identifier'): agent.get('name') for agent in agents if agent.get('github_identifier')}
if not all_metadata_dict:
return {'agents': [], 'months': [], 'data': {}}
agent_month_data = defaultdict(lambda: defaultdict(list))
for agent_identifier, metadata_list in all_metadata_dict.items():
for review_meta in metadata_list:
reviewed_at = review_meta.get('reviewed_at')
if not reviewed_at:
continue
agent_name = identifier_to_name.get(agent_identifier, agent_identifier)
try:
dt = datetime.fromisoformat(reviewed_at.replace('Z', '+00:00'))
month_key = f"{dt.year}-{dt.month:02d}"
agent_month_data[agent_name][month_key].append(review_meta)
except Exception as e:
print(f"Warning: Could not parse date '{reviewed_at}': {e}")
continue
all_months = set()
for agent_data in agent_month_data.values():
all_months.update(agent_data.keys())
months = sorted(list(all_months))
result_data = {}
for agent_name, month_dict in agent_month_data.items():
acceptance_rates = []
total_reviews_list = []
merged_prs_list = []
for month in months:
reviews_in_month = month_dict.get(month, [])
merged_count = sum(1 for review in reviews_in_month
if get_pr_status_from_metadata(review) == 'merged')
rejected_count = sum(1 for review in reviews_in_month
if get_pr_status_from_metadata(review) == 'closed')
total_count = len(reviews_in_month)
completed_count = merged_count + rejected_count
acceptance_rate = (merged_count / completed_count * 100) if completed_count > 0 else None
acceptance_rates.append(acceptance_rate)
total_reviews_list.append(total_count)
merged_prs_list.append(merged_count)
result_data[agent_name] = {
'acceptance_rates': acceptance_rates,
'total_reviews': total_reviews_list,
'merged_prs': merged_prs_list,
}
agents_list = sorted(list(agent_month_data.keys()))
return {
'agents': agents_list,
'months': months,
'data': result_data
}
def construct_leaderboard_from_metadata(all_metadata_dict, agents):
"""Construct leaderboard from in-memory review metadata."""
if not agents:
print("Error: No agents found")
return {}
cache_dict = {}
for agent in agents:
identifier = agent.get('github_identifier')
agent_name = agent.get('name', 'Unknown')
bot_metadata = all_metadata_dict.get(identifier, [])
stats = calculate_review_stats_from_metadata(bot_metadata)
cache_dict[identifier] = {
'name': agent_name,
'website': agent.get('website', 'N/A'),
'github_identifier': identifier,
**stats
}
return cache_dict
def save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics):
"""Save leaderboard data and monthly metrics to HuggingFace dataset."""
try:
token = get_hf_token()
if not token:
raise Exception("No HuggingFace token found")
api = HfApi(token=token)
filename = "swe-review.json"
combined_data = {
'last_updated': datetime.now(timezone.utc).isoformat(),
'leaderboard': leaderboard_dict,
'monthly_metrics': monthly_metrics,
'metadata': {
'leaderboard_time_frame_days': LEADERBOARD_TIME_FRAME_DAYS
}
}
with open(filename, 'w') as f:
json.dump(combined_data, f, indent=2)
try:
upload_file_with_backoff(
api=api,
path_or_fileobj=filename,
path_in_repo=filename,
repo_id=LEADERBOARD_REPO,
repo_type="dataset"
)
return True
finally:
if os.path.exists(filename):
os.remove(filename)
except Exception as e:
print(f"Error saving leaderboard data: {str(e)}")
import traceback
traceback.print_exc()
return False
# =============================================================================
# MINING FUNCTION
# =============================================================================
def mine_all_agents():
"""
Mine review metadata for all agents using STREAMING batch processing.
Downloads GHArchive data, then uses BATCH-based DuckDB queries.
"""
print(f"\n[1/4] Downloading GHArchive data...")
if not download_all_gharchive_data():
print("Warning: Download had errors, continuing with available data...")
print(f"\n[2/4] Loading agent metadata...")
agents = load_agents_from_hf()
if not agents:
print("Error: No agents found")
return
identifiers = [agent['github_identifier'] for agent in agents if agent.get('github_identifier')]
if not identifiers:
print("Error: No valid agent identifiers found")
return
print(f"\n[3/4] Mining review metadata ({len(identifiers)} agents, {LEADERBOARD_TIME_FRAME_DAYS} days)...")
try:
conn = get_duckdb_connection()
except Exception as e:
print(f"Failed to initialize DuckDB connection: {str(e)}")
return
current_time = datetime.now(timezone.utc)
end_date = current_time.replace(hour=0, minute=0, second=0, microsecond=0)
start_date = end_date - timedelta(days=LEADERBOARD_TIME_FRAME_DAYS)
try:
# USE STREAMING FUNCTION
all_metadata = fetch_all_review_metadata_streaming(
conn, identifiers, start_date, end_date
)
except Exception as e:
print(f"Error during DuckDB fetch: {str(e)}")
import traceback
traceback.print_exc()
return
finally:
conn.close()
print(f"\n[4/4] Saving leaderboard...")
try:
leaderboard_dict = construct_leaderboard_from_metadata(all_metadata, agents)
monthly_metrics = calculate_monthly_metrics_by_agent(all_metadata, agents)
save_leaderboard_data_to_hf(leaderboard_dict, monthly_metrics)
except Exception as e:
print(f"Error saving leaderboard: {str(e)}")
import traceback
traceback.print_exc()
# =============================================================================
# SCHEDULER SETUP
# =============================================================================
def setup_scheduler():
"""Set up APScheduler to run mining jobs periodically."""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logging.getLogger('httpx').setLevel(logging.WARNING)
scheduler = BlockingScheduler(timezone=SCHEDULE_TIMEZONE)
trigger = CronTrigger(
day=SCHEDULE_DAY_OF_MONTH,
hour=SCHEDULE_HOUR,
minute=SCHEDULE_MINUTE,
timezone=SCHEDULE_TIMEZONE
)
scheduler.add_job(
mine_all_agents,
trigger=trigger,
id='mine_all_agents',
name='Mine GHArchive data for all agents',
replace_existing=True
)
from datetime import datetime
next_run = trigger.get_next_fire_time(None, datetime.now(trigger.timezone))
print(f"Scheduler: Monthly on day {SCHEDULE_DAY_OF_MONTH} at {SCHEDULE_HOUR:02d}:{SCHEDULE_MINUTE:02d} {SCHEDULE_TIMEZONE}")
print(f"Next run: {next_run}\n")
print(f"\nScheduler started")
scheduler.start()
# =============================================================================
# ENTRY POINT
# =============================================================================
if __name__ == "__main__":
if SCHEDULE_ENABLED:
setup_scheduler()
else:
mine_all_agents()