Zen0
Improve UX: Move evaluation settings to top of page
62d78bf
#!/usr/bin/env python3
"""
AusCyberBench Evaluation Dashboard
Interactive Gradio Space for benchmarking LLMs on Australian cybersecurity knowledge
"""
import gradio as gr
import spaces
import torch
import gc
import json
import re
import time
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
from collections import defaultdict
from datasets import load_dataset
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig
import numpy as np
# Australian color scheme
AUSSIE_GREEN = '#008751'
AUSSIE_GOLD = '#FFB81C'
# Model categories - proven stable models
MODELS_BY_CATEGORY = {
"✅ Recommended (Tested)": [
"microsoft/Phi-3-mini-4k-instruct", # Proven stable
"microsoft/Phi-3.5-mini-instruct", # Works well
"Qwen/Qwen2.5-3B-Instruct", # Just tested 55.6%! ⭐
"Qwen/Qwen2.5-7B-Instruct", # Good performance
"deepseek-ai/deepseek-llm-7b-chat", # Previously tested 55%+
"TinyLlama/TinyLlama-1.1B-Chat-v1.0", # Previously tested 33%+
],
"🛡️ Cybersecurity-Focused": [
"deepseek-ai/deepseek-coder-6.7b-instruct", # Code security
"WizardLM/WizardCoder-Python-7B-V1.0", # Wizard Coder
"bigcode/starcoder2-7b", # StarCoder2
"meta-llama/CodeLlama-7b-Instruct-hf", # CodeLlama
"Salesforce/codegen25-7b-instruct", # CodeGen
],
"Small Models (1-4B)": [
"microsoft/Phi-3-mini-4k-instruct",
"microsoft/Phi-3.5-mini-instruct",
"Qwen/Qwen2.5-3B-Instruct",
"TinyLlama/TinyLlama-1.1B-Chat-v1.0",
# Removed gated models: google/gemma-2-2b-it, meta-llama/Llama-3.2-3B-Instruct
# Removed: stabilityai/stablelm-2-1_6b-chat (0% accuracy)
],
"Medium Models (7-12B)": [
"mistralai/Mistral-7B-Instruct-v0.3",
"Qwen/Qwen2.5-7B-Instruct",
"mistralai/Mistral-Nemo-Instruct-2407",
"01-ai/Yi-1.5-9B-Chat",
# Removed gated models: meta-llama/Llama-3.1-8B-Instruct, google/gemma-2-9b-it
],
"Reasoning & Analysis": [
"deepseek-ai/deepseek-llm-7b-chat",
"upstage/SOLAR-10.7B-Instruct-v1.0",
"NousResearch/Hermes-3-Llama-3.1-8B",
"Qwen/Qwen2.5-14B-Instruct",
],
"Diverse & Multilingual": [
"tiiuae/falcon-7b-instruct",
"openchat/openchat-3.5-0106",
"teknium/OpenHermes-2.5-Mistral-7B",
],
}
# Flatten all models
ALL_MODELS = [model for category in MODELS_BY_CATEGORY.values() for model in category]
# Global state
current_results = []
dataset_cache = None
PERSISTENT_RESULTS_FILE = "persistent_results.json"
def load_persistent_results():
"""Load persistent results from disk"""
if Path(PERSISTENT_RESULTS_FILE).exists():
try:
with open(PERSISTENT_RESULTS_FILE, 'r') as f:
return json.load(f)
except Exception as e:
print(f"Error loading persistent results: {e}")
return []
return []
def save_persistent_results(results):
"""Save results to persistent storage"""
try:
with open(PERSISTENT_RESULTS_FILE, 'w') as f:
json.dump(results, f, indent=2)
except Exception as e:
print(f"Error saving persistent results: {e}")
def merge_results(existing_results, new_results):
"""Merge new results with existing, keeping best score per model"""
# Create dict of existing results keyed by model name
results_dict = {r['model']: r for r in existing_results}
# Update with new results (keep best accuracy)
for new_result in new_results:
model_name = new_result['model']
if model_name in results_dict:
# Keep result with higher accuracy
existing_acc = results_dict[model_name].get('overall_accuracy', 0)
new_acc = new_result.get('overall_accuracy', 0)
if new_acc > existing_acc:
results_dict[model_name] = new_result
else:
results_dict[model_name] = new_result
# Convert back to list and sort by accuracy
merged = list(results_dict.values())
merged.sort(key=lambda x: x.get('overall_accuracy', 0), reverse=True)
return merged
def clear_persistent_results():
"""Clear all persistent results"""
try:
if Path(PERSISTENT_RESULTS_FILE).exists():
Path(PERSISTENT_RESULTS_FILE).unlink()
# Return empty displays
return (
"✅ Persistent results cleared!",
pd.DataFrame(),
None,
None
)
except Exception as e:
return (
f"❌ Error clearing results: {e}",
pd.DataFrame(),
None,
None
)
def load_initial_leaderboard():
"""Load and display persistent leaderboard on startup"""
persistent_results = load_persistent_results()
if persistent_results:
table = format_results_table(persistent_results)
chart = create_comparison_chart(persistent_results)
download = create_download_data(persistent_results)
return table, chart, download
return pd.DataFrame(), None, None
def load_benchmark_dataset(subset="australian", num_samples=200):
"""Load and sample AusCyberBench dataset"""
global dataset_cache
if dataset_cache is None:
# Load data files individually to handle different schemas per file
from datasets import concatenate_datasets
# Get list of category files for the subset
import glob
from huggingface_hub import hf_hub_download
# Manually specify the categories to avoid globbing issues
categories = [
"knowledge_terminology",
"knowledge_threat_intelligence",
"regulatory_essential_eight",
"regulatory_ism_controls",
"regulatory_privacy_act",
"regulatory_soci_act"
]
datasets_list = []
for category in categories:
try:
ds = load_dataset(
"json",
data_files=f"hf://datasets/Zen0/AusCyberBench/data/{subset}/{category}.jsonl",
split="train"
)
# Remove metadata columns that may differ between files
cols_to_remove = [col for col in ds.column_names if col not in [
'task_id', 'category', 'subcategory', 'title', 'description',
'task_type', 'difficulty', 'answer', 'options', 'context',
'australian_focus', 'regulatory_references'
]]
if cols_to_remove:
ds = ds.remove_columns(cols_to_remove)
datasets_list.append(ds)
except Exception as e:
print(f"Warning: Could not load {category}: {e}")
# Concatenate all datasets
dataset_cache = concatenate_datasets(datasets_list)
# Proportional sampling
import random
random.seed(42)
by_category = defaultdict(list)
for item in dataset_cache:
by_category[item['category']].append(item)
total = len(dataset_cache)
samples = []
for cat, items in by_category.items():
n_cat = max(1, int(len(items) / total * num_samples))
if len(items) <= n_cat:
samples.extend(items)
else:
samples.extend(random.sample(items, n_cat))
random.shuffle(samples)
return samples[:num_samples]
def format_prompt(task, model_name):
"""Format task as prompt with proper chat template"""
question = task['description']
if task.get('task_type') == 'multiple_choice' and 'options' in task:
options_text = "\n".join([f"{opt['id']}. {opt['text']}" for opt in task['options']])
if 'phi' in model_name.lower():
return f"""<|user|>
{question}
{options_text}
Respond with ONLY the letter of the correct answer (A, B, C, or D).<|end|>
<|assistant|>"""
elif 'gemma' in model_name.lower():
return f"""<start_of_turn>user
{question}
{options_text}
Respond with ONLY the letter of the correct answer (A, B, C, or D).<end_of_turn>
<start_of_turn>model
"""
else:
return f"""[INST] {question}
{options_text}
Respond with ONLY the letter of the correct answer (A, B, C, or D). [/INST]"""
else:
return f"""[INST] {question} [/INST]"""
def extract_answer(response, task):
"""Extract answer letter from model response"""
response = response.strip()
if task.get('task_type') == 'multiple_choice':
# Try multiple extraction patterns
# Pattern 1: Letter with word boundary
match = re.search(r'\b([A-D])\b', response, re.IGNORECASE)
if match:
return match.group(1).upper()
# Pattern 2: Letter with punctuation (A. A) A: etc)
match = re.search(r'([A-D])[.):,]', response, re.IGNORECASE)
if match:
return match.group(1).upper()
# Pattern 3: "Answer: A" or "Answer is A"
match = re.search(r'(?:answer|choice)(?:\s+is)?\s*:?\s*([A-D])\b', response, re.IGNORECASE)
if match:
return match.group(1).upper()
# Pattern 4: First character if it's A-D
if response and response[0].upper() in ['A', 'B', 'C', 'D']:
return response[0].upper()
# Pattern 5: Look anywhere in first 50 chars for isolated letter
first_part = response[:50]
for char in first_part:
if char.upper() in ['A', 'B', 'C', 'D']:
return char.upper()
return ""
else:
return response[:100]
def cleanup_model(model, tokenizer):
"""Thoroughly clean up model to free memory"""
if model is not None:
del model
if tokenizer is not None:
del tokenizer
if torch.cuda.is_available():
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
@spaces.GPU # Uses default 60s duration (ZeroGPU free tier limit)
def evaluate_single_model(model_name, tasks, use_4bit=True, temperature=0.7, max_tokens=128, progress=gr.Progress()):
"""Evaluate a single model on the benchmark"""
progress(0, desc=f"Loading {model_name.split('/')[-1]}...")
try:
# Load model
if use_4bit:
quant_config = BitsAndBytesConfig(
load_in_4bit=True,
bnb_4bit_compute_dtype=torch.float16,
bnb_4bit_use_double_quant=True,
bnb_4bit_quant_type="nf4"
)
else:
quant_config = None
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True, use_fast=False)
model = AutoModelForCausalLM.from_pretrained(
model_name,
quantization_config=quant_config,
device_map="auto",
trust_remote_code=True,
torch_dtype=torch.float16 if not use_4bit else None
)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
progress(0.1, desc=f"Evaluating {model_name.split('/')[-1]}...")
# Evaluate tasks
results = []
for i, task in enumerate(tasks):
progress((0.1 + 0.8 * i / len(tasks)), desc=f"Task {i+1}/{len(tasks)}")
try:
prompt = format_prompt(task, model_name)
# COMPREHENSIVE DEBUG
if i == 0:
import sys
debug_msg = f"\n{'='*60}\nDEBUG FIRST TASK\n{'='*60}\n"
debug_msg += f"Prompt length: {len(prompt)} chars\n"
debug_msg += f"Prompt preview: {prompt[:200]}...\n"
print(debug_msg, flush=True)
sys.stdout.flush()
inputs = tokenizer(prompt, return_tensors="pt").to(model.device)
if 'token_type_ids' in inputs:
inputs.pop('token_type_ids')
if i == 0:
print(f"Input shape: {inputs['input_ids'].shape}", flush=True)
with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=max_tokens,
temperature=temperature,
do_sample=True,
top_p=0.9,
pad_token_id=tokenizer.eos_token_id,
use_cache=False # Disable KV cache to avoid DynamicCache compatibility issues
)
if i == 0:
print(f"Output shape: {outputs.shape}", flush=True)
print(f"Input length: {inputs['input_ids'].shape[1]}", flush=True)
response = tokenizer.decode(
outputs[0][inputs['input_ids'].shape[1]:],
skip_special_tokens=True
)
# FORCE PRINT WITH FLUSH
if i < 3:
import sys
msg = f"\n>>> TASK {i} RESPONSE: '{response}' (len={len(response)})\n"
print(msg, flush=True)
sys.stdout.flush()
# Also write to file for debugging
with open('/tmp/debug_responses.txt', 'a') as f:
f.write(msg)
predicted = extract_answer(response, task)
correct = task.get('answer', '')
is_correct = predicted.upper() == correct.upper()
if i < 3:
msg = f">>> TASK {i} EXTRACT: predicted='{predicted}', correct='{correct}', match={is_correct}\n"
print(msg, flush=True)
sys.stdout.flush()
with open('/tmp/debug_responses.txt', 'a') as f:
f.write(msg)
results.append({
'task_id': task.get('task_id'),
'category': task.get('category'),
'predicted': predicted,
'correct': correct,
'is_correct': is_correct
})
except Exception as e:
import traceback
import sys
error_msg = f"\n!!! EXCEPTION in task {i}: {str(e)}\n{traceback.format_exc()}\n"
print(error_msg, flush=True)
sys.stdout.flush()
with open('/tmp/debug_responses.txt', 'a') as f:
f.write(error_msg)
results.append({
'task_id': task.get('task_id'),
'category': task.get('category'),
'predicted': '',
'correct': task.get('answer', ''),
'is_correct': False
})
# Calculate metrics
total_correct = sum(1 for r in results if r['is_correct'])
overall_accuracy = (total_correct / len(results)) * 100
category_stats = defaultdict(lambda: {'correct': 0, 'total': 0})
for result in results:
cat = result['category']
category_stats[cat]['total'] += 1
if result['is_correct']:
category_stats[cat]['correct'] += 1
category_scores = {
cat: (stats['correct'] / stats['total']) * 100 if stats['total'] > 0 else 0
for cat, stats in category_stats.items()
}
progress(1.0, desc="Complete!")
return {
'model': model_name,
'overall_accuracy': overall_accuracy,
'total_correct': total_correct,
'total_tasks': len(results),
'category_scores': category_scores,
'detailed_results': results
}
except Exception as e:
return {
'model': model_name,
'error': str(e),
'overall_accuracy': 0,
'total_correct': 0,
'total_tasks': len(tasks)
}
finally:
cleanup_model(
model if 'model' in locals() else None,
tokenizer if 'tokenizer' in locals() else None
)
def run_evaluation(selected_models, num_samples, use_4bit, temperature, max_tokens, progress=gr.Progress()):
"""Run evaluation on selected models"""
global current_results
if not selected_models:
return "Please select at least one model to evaluate.", None, None
# Load existing persistent results
persistent_results = load_persistent_results()
# Load dataset
progress(0, desc="Loading AusCyberBench dataset...")
tasks = load_benchmark_dataset(num_samples=num_samples)
# Evaluate each model
new_results = []
for i, model_name in enumerate(selected_models):
progress((i / len(selected_models)), desc=f"Model {i+1}/{len(selected_models)}")
result = evaluate_single_model(
model_name, tasks, use_4bit, temperature, max_tokens, progress
)
new_results.append(result)
# Merge with persistent results after each model
current_results = merge_results(persistent_results, new_results)
save_persistent_results(current_results)
# Yield intermediate results (showing full leaderboard including historical)
yield format_results_table(current_results), create_comparison_chart(current_results), None
# Final results (merged with historical)
current_results = merge_results(persistent_results, new_results)
save_persistent_results(current_results)
final_table = format_results_table(current_results)
final_chart = create_comparison_chart(current_results)
download_data = create_download_data(current_results)
yield final_table, final_chart, download_data
def format_results_table(results):
"""Format results as DataFrame for display"""
if not results:
return pd.DataFrame()
rows = []
for result in results:
if 'error' in result:
rows.append({
'Rank': '❌',
'Model': result['model'].split('/')[-1],
'Accuracy': '0.0%',
'Correct/Total': f"0/{result['total_tasks']}",
'Status': f"Error: {result['error'][:50]}"
})
else:
rows.append({
'Rank': '',
'Model': result['model'].split('/')[-1],
'Accuracy': f"{result['overall_accuracy']:.1f}%",
'Correct/Total': f"{result['total_correct']}/{result['total_tasks']}",
'Status': '✓ Complete'
})
df = pd.DataFrame(rows)
# Sort by accuracy and assign ranks
df['_sort'] = df['Accuracy'].str.replace('%', '').astype(float)
df = df.sort_values('_sort', ascending=False)
# Assign medals (handle cases with fewer than 3 models)
medals = ['🥇', '🥈', '🥉']
ranks = medals[:len(df)] + [''] * max(0, len(df) - len(medals))
df['Rank'] = ranks
df = df.drop('_sort', axis=1)
return df
def create_comparison_chart(results):
"""Create enhanced bar chart comparing model accuracies with Australian color scheme"""
if not results or all('error' in r for r in results):
return None
valid_results = [r for r in results if 'error' not in r]
if not valid_results:
return None
models = [r['model'].split('/')[-1] for r in valid_results]
accuracies = [r['overall_accuracy'] for r in valid_results]
# Sort by accuracy
sorted_pairs = sorted(zip(models, accuracies), key=lambda x: x[1], reverse=True)
models, accuracies = zip(*sorted_pairs)
# Create figure with Australian colors
fig, ax = plt.subplots(figsize=(14, max(7, len(models) * 0.45)))
# Create color gradient from green to gold
colors = []
for i, acc in enumerate(accuracies):
# Top performers get gold, others get green with varying intensity
if i == 0:
colors.append(AUSSIE_GOLD)
elif i < 3:
colors.append('#00A86B') # Bright green
else:
colors.append(AUSSIE_GREEN)
bars = ax.barh(models, accuracies, color=colors, edgecolor='black', linewidth=0.5)
# Add accuracy labels
for i, (model, acc) in enumerate(zip(models, accuracies)):
ax.text(acc + 1.5, i, f'{acc:.1f}%', va='center', fontweight='bold', fontsize=10)
# Styling
ax.set_xlabel('Accuracy (%)', fontsize=13, fontweight='bold')
ax.set_title('AusCyberBench: Model Performance Ranking', fontsize=15, fontweight='bold', pad=20)
ax.set_xlim(0, min(100, max(accuracies) + 10))
ax.grid(axis='x', alpha=0.3, linestyle='--')
ax.spines['top'].set_visible(False)
ax.spines['right'].set_visible(False)
# Add background color
ax.set_facecolor('#f9f9f9')
plt.tight_layout()
return plt
def create_download_data(results):
"""Create downloadable results file"""
if not results:
return None
# Create comprehensive results JSON
output = {
'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
'benchmark': 'AusCyberBench',
'results': results
}
# Save to file
output_path = 'auscyberbench_results.json'
with open(output_path, 'w') as f:
json.dump(output, f, indent=2)
return output_path
# Build Gradio interface
with gr.Blocks(title="AusCyberBench Evaluation Dashboard", theme=gr.themes.Soft()) as app:
gr.Markdown("""
# 🇦🇺 AusCyberBench Evaluation Dashboard
**Australia's First LLM Cybersecurity Benchmark** • 13,449 Tasks • 25 Open Models
Evaluate proven open language models on Australian cybersecurity knowledge including
Essential Eight, ISM Controls, Privacy Act, SOCI Act, and ACSC Threat Intelligence.
✅ **Recommended models** have been tested: Qwen2.5-3B (55.6%), DeepSeek (55%), TinyLlama (33%)
""")
# Settings section at top for better UX
gr.Markdown("## ⚙️ Evaluation Settings")
with gr.Row():
num_samples = gr.Slider(10, 500, value=10, step=10, label="Number of Tasks (10 recommended)")
use_4bit = gr.Checkbox(label="Use 4-bit Quantisation", value=True)
with gr.Row():
temperature = gr.Slider(0.1, 1.0, value=0.7, step=0.1, label="Temperature")
max_tokens = gr.Slider(8, 256, value=32, step=8, label="Max New Tokens")
run_btn = gr.Button("🚀 Run Evaluation", variant="primary", size="lg")
gr.Markdown("---")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 📋 Model Selection")
gr.Markdown("""
**💾 Persistent Results:** Run 1-2 models at a time to avoid GPU timeouts.
Results merge with the leaderboard automatically!
""")
# Quick selection buttons
with gr.Row():
btn_recommended = gr.Button("✅ Recommended (6)", size="sm", variant="primary")
btn_security = gr.Button("🛡️ Security (5)", size="sm", variant="secondary")
with gr.Row():
btn_small = gr.Button("Small (4)", size="sm")
btn_medium = gr.Button("Medium (4)", size="sm")
with gr.Row():
btn_all = gr.Button("Select All (25)", size="sm")
btn_clear = gr.Button("Clear All", size="sm")
# Model checkboxes by category
model_checkboxes = []
for category, models in MODELS_BY_CATEGORY.items():
gr.Markdown(f"**{category}**")
for model in models:
short_name = model.split('/')[-1]
cb = gr.Checkbox(label=f"{short_name}", value=False)
model_checkboxes.append((cb, model))
gr.Markdown("### ⚡ GPU Limits")
gr.Markdown("""
**Free tier: 60-second limit**
- ✅ 1-2 models: Safe
- ⚠️ 3-5 models: May timeout
- ❌ 6+ models: Will timeout
""")
with gr.Column(scale=2):
gr.Markdown("### 📊 Persistent Leaderboard")
gr.Markdown("""
**💾 Results persist across sessions!** Run models one at a time to build up a complete leaderboard.
- New runs merge with existing results
- Best score per model is kept
- Perfect for avoiding GPU timeouts
""")
clear_status = gr.Markdown("")
clear_btn = gr.Button("🗑️ Clear All Results", size="sm", variant="stop")
results_table = gr.Dataframe(
label="Leaderboard",
headers=["Rank", "Model", "Accuracy", "Correct/Total", "Status"],
interactive=False
)
comparison_plot = gr.Plot(label="Model Comparison")
download_file = gr.File(label="Download Results (JSON)")
# Quick select button actions
def select_recommended():
return [gr.update(value=(model in MODELS_BY_CATEGORY["✅ Recommended (Tested)"]))
for cb, model in model_checkboxes]
def select_security():
return [gr.update(value=(model in MODELS_BY_CATEGORY["🛡️ Cybersecurity-Focused"]))
for cb, model in model_checkboxes]
def select_small():
return [gr.update(value=(model in MODELS_BY_CATEGORY["Small Models (1-4B)"]))
for cb, model in model_checkboxes]
def select_medium():
return [gr.update(value=(model in MODELS_BY_CATEGORY["Medium Models (7-12B)"]))
for cb, model in model_checkboxes]
def select_all():
return [gr.update(value=True) for _ in model_checkboxes]
def clear_all():
return [gr.update(value=False) for _ in model_checkboxes]
btn_recommended.click(select_recommended, outputs=[cb for cb, _ in model_checkboxes])
btn_security.click(select_security, outputs=[cb for cb, _ in model_checkboxes])
btn_small.click(select_small, outputs=[cb for cb, _ in model_checkboxes])
btn_medium.click(select_medium, outputs=[cb for cb, _ in model_checkboxes])
btn_all.click(select_all, outputs=[cb for cb, _ in model_checkboxes])
btn_clear.click(clear_all, outputs=[cb for cb, _ in model_checkboxes])
# Run evaluation
def prepare_evaluation(*checkbox_values):
selected = [model for (cb, model), val in zip(model_checkboxes, checkbox_values) if val]
return selected
def evaluation_wrapper(*args):
"""Wrapper to handle checkbox inputs and call run_evaluation as generator"""
selected = prepare_evaluation(*args[:-4])
yield from run_evaluation(
selected,
int(args[-4]),
args[-3],
args[-2],
int(args[-1])
)
run_btn.click(
fn=evaluation_wrapper,
inputs=[cb for cb, _ in model_checkboxes] + [num_samples, use_4bit, temperature, max_tokens],
outputs=[results_table, comparison_plot, download_file]
)
# Clear results button
clear_btn.click(
fn=clear_persistent_results,
outputs=[clear_status, results_table, comparison_plot, download_file]
)
# Load persistent leaderboard on startup
app.load(
fn=load_initial_leaderboard,
outputs=[results_table, comparison_plot, download_file]
)
gr.Markdown("""
---
**Dataset:** [Zen0/AusCyberBench](https://huggingface.co/datasets/Zen0/AusCyberBench) • 13,449 tasks |
**Models:** 25 open LLMs (no gated models) |
**License:** MIT
""")
if __name__ == "__main__":
app.queue().launch()