LLM4HEP / util /analyze_particles.py
ho22joshua's picture
initial commit
cfcbbc8
#!/usr/bin/env python3
"""
Generic particle analyzer for ROOT files
Usage: python analyze_particles.py <prefix> [filepath]
Examples:
python analyze_particles.py lep
python analyze_particles.py photon
python analyze_particles.py tau
python analyze_particles.py jet
"""
import sys
import os
import uproot
import numpy as np
import argparse
def get_available_prefixes(filepath):
"""Get all available particle prefixes in the ROOT file"""
with uproot.open(filepath) as file:
tree = file['mini;1']
branches = list(tree.keys())
prefixes = set()
for branch in branches:
if '_' in branch:
prefix = branch.split('_')[0]
prefixes.add(prefix)
return sorted(list(prefixes))
def analyze_particles(filepath, prefix, max_events=None):
"""Analyze particle properties in detail"""
print(f"Analyzing {prefix} properties in: {filepath}")
print("=" * 60)
with uproot.open(filepath) as file:
tree = file['mini;1']
branches = list(tree.keys())
# Find all branches with the given prefix
prefix_branches = [b for b in branches if b.startswith(prefix + '_')]
if not prefix_branches:
print(f"No branches found with prefix '{prefix}_'")
print(f"Available prefixes: {get_available_prefixes(filepath)}")
return
print(f"Found {len(prefix_branches)} branches with prefix '{prefix}_':")
for branch in sorted(prefix_branches):
print(f" - {branch}")
print()
# Load data
data = {}
for branch in prefix_branches:
try:
data[branch] = tree[branch].array()
if max_events:
data[branch] = data[branch][:max_events]
except Exception as e:
print(f"Warning: Could not load {branch}: {e}")
continue
if not data:
print("No data could be loaded!")
return
n_events = len(data[list(data.keys())[0]])
print(f"Total events analyzed: {n_events}")
print()
# Analyze multiplicity if available
multiplicity_branch = f"{prefix}_n"
has_variable_multiplicity = multiplicity_branch in data
# For photons, check if ID variables are stored as fixed arrays
has_fixed_multiplicity = False
if prefix == 'photon':
# Check if identification variables exist and are 2D
id_branches = [b for b in prefix_branches if b in [f'{prefix}_isTightID', f'{prefix}_truthMatched', f'{prefix}_trigMatched']]
if id_branches:
sample_id = data[id_branches[0]]
# Check if it's a 2D array (events × 2 photons) using awkward array properties
try:
# For awkward arrays, check if we can access [:,1] (second photon)
test_access = sample_id[:,1]
has_fixed_multiplicity = True
except:
has_fixed_multiplicity = False
if has_variable_multiplicity:
analyze_multiplicity(data[multiplicity_branch], prefix)
# Analyze kinematic variables
kinematic_vars = ['pt', 'eta', 'phi', 'E', 'm']
for var in kinematic_vars:
branch_name = f"{prefix}_{var}"
if branch_name in data:
analyze_kinematic(data[branch_name], var.upper(), prefix,
has_variable_multiplicity, has_fixed_multiplicity)
# Analyze identification variables
id_vars = ['type', 'charge', 'isTightID', 'truthMatched', 'trigMatched']
for var in id_vars:
branch_name = f"{prefix}_{var}"
if branch_name in data:
analyze_identification(data[branch_name], var, prefix,
has_variable_multiplicity, has_fixed_multiplicity, prefix)
def analyze_multiplicity(mult_data, prefix):
"""Analyze particle multiplicity"""
print(f"{prefix.upper()} multiplicity distribution:")
unique, counts = np.unique(mult_data, return_counts=True)
for n, count in zip(unique, counts):
percentage = count / len(mult_data) * 100
print(" {} {}(s): {:6d} events ({:.1f}%)".format(n, prefix, count, percentage))
print()
def analyze_kinematic(var_data, var_name, prefix, has_variable_multiplicity=False, has_fixed_multiplicity=False):
"""Analyze kinematic variables"""
print(f"{prefix.upper()} {var_name} analysis:")
if has_variable_multiplicity:
# Handle variable number of particles per event
all_values = []
leading_values = []
subleading_values = []
for event_values in var_data:
if len(event_values) > 0:
# Sort by pT if this is pT, otherwise just take as is
if var_name == 'PT':
sorted_values = sorted(event_values, reverse=True)
else:
sorted_values = event_values
all_values.extend(sorted_values)
# Store leading and subleading
if len(sorted_values) >= 1:
leading_values.append(sorted_values[0])
if len(sorted_values) >= 2:
subleading_values.append(sorted_values[1])
values = np.array(all_values)
leading = np.array(leading_values) if leading_values else None
subleading = np.array(subleading_values) if subleading_values else None
print(f" Total number of {prefix}(s): {len(values)}")
print(f" Events with ≥1 {prefix}: {len(leading) if leading is not None else 0}")
print(f" Events with ≥2 {prefix}(s): {len(subleading) if subleading is not None else 0}")
elif has_fixed_multiplicity:
# Handle fixed number of particles per event (like exactly 2 photons)
values = np.array(var_data)
if var_name == 'PT':
# For fixed multiplicity, assume first column is leading, second is subleading
leading = values[:, 0] if values.shape[1] > 0 else None
subleading = values[:, 1] if values.shape[1] > 1 else None
else:
leading = values[:, 0] if values.shape[1] > 0 else None
subleading = values[:, 1] if values.shape[1] > 1 else None
print(f" Total number of {prefix}(s): {len(values)}")
else:
# Handle single values per event
values = np.array(var_data)
leading = None
subleading = None
print(f" Total number of {prefix}(s): {len(values)}")
if len(values) == 0:
print(" No data available")
return
# Convert to GeV if it's energy/momentum
if var_name in ['PT', 'E', 'M']:
values_gev = values / 1000
leading_gev = leading / 1000 if leading is not None else None
subleading_gev = subleading / 1000 if subleading is not None else None
unit = "GeV"
else:
values_gev = values
leading_gev = leading
subleading_gev = subleading
unit = ""
print(f" {var_name} statistics ({unit}) - All {prefix}(s):")
print(" Mean: {:.3f}".format(np.mean(values_gev)))
print(" Median: {:.3f}".format(np.median(values_gev)))
print(" Min: {:.3f}".format(np.min(values_gev)))
print(" Max: {:.3f}".format(np.max(values_gev)))
print(" Std: {:.3f}".format(np.std(values_gev)))
# Show leading particle stats
if leading_gev is not None and len(leading_gev) > 0:
print(f" {var_name} statistics ({unit}) - Leading {prefix}:")
print(" Mean: {:.3f}".format(np.mean(leading_gev)))
print(" Median: {:.3f}".format(np.median(leading_gev)))
print(" Min: {:.3f}".format(np.min(leading_gev)))
print(" Max: {:.3f}".format(np.max(leading_gev)))
print(" Std: {:.3f}".format(np.std(leading_gev)))
# Show subleading particle stats
if subleading_gev is not None and len(subleading_gev) > 0:
print(f" {var_name} statistics ({unit}) - Subleading {prefix}:")
print(" Mean: {:.3f}".format(np.mean(subleading_gev)))
print(" Median: {:.3f}".format(np.median(subleading_gev)))
print(" Min: {:.3f}".format(np.min(subleading_gev)))
print(" Max: {:.3f}".format(np.max(subleading_gev)))
print(" Std: {:.3f}".format(np.std(subleading_gev)))
# Show ratio between leading and subleading if both exist
if leading_gev is not None and len(leading_gev) == len(subleading_gev):
ratio = subleading_gev / leading_gev
print(f" {var_name} ratio (Subleading/Leading):")
print(" Mean: {:.3f}".format(np.mean(ratio)))
print(" Median: {:.3f}".format(np.median(ratio)))
print(" Min: {:.3f}".format(np.min(ratio)))
print(" Max: {:.3f}".format(np.max(ratio)))
print()
def analyze_identification(var_data, var_name, prefix, has_variable_multiplicity=False, has_fixed_multiplicity=False, particle_prefix=None):
"""Analyze identification variables"""
print(f"{prefix.upper()} {var_name} analysis:")
# For photons, prioritize fixed multiplicity logic even if variable multiplicity exists
use_fixed_multiplicity = has_fixed_multiplicity and (particle_prefix == 'photon' or not has_variable_multiplicity)
if use_fixed_multiplicity:
# Handle fixed number of particles per event (like exactly 2 photons)
values = np.array(var_data)
# For fixed multiplicity, analyze leading and subleading separately
if values.shape[1] >= 2:
leading_values = values[:, 0]
subleading_values = values[:, 1]
print(f" Overall {var_name} distribution:")
analyze_id_distribution(values.flatten(), var_name, prefix)
print(f" Leading {prefix} {var_name} distribution:")
analyze_id_distribution(leading_values, var_name, prefix)
print(f" Subleading {prefix} {var_name} distribution:")
analyze_id_distribution(subleading_values, var_name, prefix)
# Show correlation between leading and subleading
if var_name in ['isTightID', 'truthMatched', 'trigMatched']:
analyze_correlation(leading_values, subleading_values, var_name, prefix)
print()
return
elif has_variable_multiplicity:
# Handle variable number of particles per event
all_values = []
for event_values in var_data:
if len(event_values) > 0:
all_values.extend(event_values)
values = np.array(all_values)
else:
# Handle single values per event
values = np.array(var_data)
# For variable multiplicity or single values
analyze_id_distribution(values, var_name, prefix)
print()
def analyze_id_distribution(values, var_name, prefix):
"""Analyze a single identification variable distribution"""
if var_name == 'type':
analyze_particle_types(values, prefix)
elif var_name in ['isTightID', 'truthMatched', 'trigMatched']:
analyze_boolean_flags(values, var_name, prefix)
elif var_name == 'charge':
analyze_charges(values, prefix)
else:
# Generic analysis
unique, counts = np.unique(values, return_counts=True)
total = len(values)
print(f" Distribution:")
for val, count in zip(unique[:10], counts[:10]): # Show first 10
percentage = count / total * 100
print(" {}: {:6d} ({:.1f}%)".format(val, count, percentage))
if len(unique) > 10:
print(f" ... and {len(unique) - 10} more values")
def analyze_correlation(leading, subleading, var_name, prefix):
"""Analyze correlation between leading and subleading particle properties"""
print(f" {prefix.upper()} {var_name} correlation (Leading × Subleading):")
# Create contingency table
both_true = np.sum((leading == True) & (subleading == True))
leading_true_sub_false = np.sum((leading == True) & (subleading == False))
leading_false_sub_true = np.sum((leading == False) & (subleading == True))
both_false = np.sum((leading == False) & (subleading == False))
total = len(leading)
print(" Both True: {:6d} ({:.1f}%)".format(both_true, both_true/total*100))
print(" Leading True, Subleading False: {:6d} ({:.1f}%)".format(
leading_true_sub_false, leading_true_sub_false/total*100))
print(" Leading False, Subleading True: {:6d} ({:.1f}%)".format(
leading_false_sub_true, leading_false_sub_true/total*100))
print(" Both False: {:6d} ({:.1f}%)".format(both_false, both_false/total*100))
def analyze_particle_types(types, prefix):
"""Analyze particle types"""
type_dict = {11: 'electron', 13: 'muon', 15: 'tau', 22: 'photon'}
print(f" {prefix.upper()} type distribution:")
unique_types, counts = np.unique(types, return_counts=True)
for ptype, count in zip(unique_types, counts):
type_name = type_dict.get(ptype, f'unknown({ptype})')
percentage = count / len(types) * 100
print(" {}: {:6d} ({:.1f}%)".format(type_name, count, percentage))
print()
def analyze_boolean_flags(flags, flag_name, prefix):
"""Analyze boolean flags"""
true_count = np.sum(flags)
false_count = len(flags) - true_count
true_pct = true_count / len(flags) * 100
false_pct = false_count / len(flags) * 100
print(f" {prefix.upper()} {flag_name} distribution:")
print(" True: {:6d} ({:.1f}%)".format(true_count, true_pct))
print(" False: {:6d} ({:.1f}%)".format(false_count, false_pct))
print()
def analyze_charges(charges, prefix):
"""Analyze particle charges"""
unique_charges, counts = np.unique(charges, return_counts=True)
print(f" {prefix.upper()} charge distribution:")
for charge, count in zip(unique_charges, counts):
percentage = count / len(charges) * 100
print(" {}: {:6d} ({:.1f}%)".format(charge, count, percentage))
print()
def main():
parser = argparse.ArgumentParser(description='Generic particle analyzer for ROOT files')
parser.add_argument('--list-prefixes', action='store_true',
help='List all available prefixes in the file')
parser.add_argument('prefix', nargs='?', help='Particle prefix (e.g., lep, photon, tau, jet)')
parser.add_argument('filepath', nargs='?',
default="/global/cfs/projectdirs/atlas/eligd/llm_for_analysis_copy/data/mc_341081.ttH125_gamgam.GamGam.root",
help='Path to ROOT file')
parser.add_argument('--max-events', type=int, help='Limit analysis to first N events')
args = parser.parse_args()
if args.list_prefixes:
if not os.path.exists(args.filepath):
print(f"Error: File '{args.filepath}' does not exist!")
return
print("Available prefixes in the file:")
prefixes = get_available_prefixes(args.filepath)
for prefix in prefixes:
print(f" - {prefix}")
return
if not args.prefix:
print("Error: Please specify a particle prefix (e.g., lep, photon, tau, jet)")
print("Use --list-prefixes to see available options")
return
if not os.path.exists(args.filepath):
print(f"Error: File '{args.filepath}' does not exist!")
return
analyze_particles(args.filepath, args.prefix, args.max_events)
if __name__ == "__main__":
main()