apertus-swiss-transparency / src /transparency_analyzer.py
Markus Clauss DIRU Vetsuisse
Initial commit - Apertus Swiss AI Transparency Dashboard
b65eda7
"""
Advanced transparency analysis tools for Apertus Swiss AI
Provides deep introspection into model decision-making processes
"""
import torch
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from typing import Dict, List, Tuple, Optional, Any
import logging
try:
from .apertus_core import ApertusCore
except ImportError:
from apertus_core import ApertusCore
logger = logging.getLogger(__name__)
class ApertusTransparencyAnalyzer:
"""
Advanced transparency analysis for Apertus models
Enables complete introspection into neural network operations,
attention patterns, hidden states, and decision processes.
"""
def __init__(self, apertus_core: Optional[ApertusCore] = None):
"""
Initialize transparency analyzer
Args:
apertus_core: Initialized ApertusCore instance, or None to create new
"""
if apertus_core is None:
self.apertus = ApertusCore(enable_transparency=True)
else:
self.apertus = apertus_core
# Ensure transparency features are enabled
if not (hasattr(self.apertus.model, 'config') and
getattr(self.apertus.model.config, 'output_attentions', False)):
logger.warning("Model not configured for transparency analysis. Some features may not work.")
def analyze_model_architecture(self) -> Dict[str, Any]:
"""
Comprehensive analysis of model architecture
Returns:
Dictionary containing detailed architecture information
"""
logger.info("🔍 Analyzing Apertus model architecture...")
config = self.apertus.model.config
# Basic architecture info
architecture = {
"model_type": config.model_type,
"num_hidden_layers": config.num_hidden_layers,
"num_attention_heads": config.num_attention_heads,
"hidden_size": config.hidden_size,
"intermediate_size": config.intermediate_size,
"vocab_size": config.vocab_size,
"max_position_embeddings": config.max_position_embeddings,
}
# Parameter analysis
total_params = sum(p.numel() for p in self.apertus.model.parameters())
trainable_params = sum(p.numel() for p in self.apertus.model.parameters() if p.requires_grad)
architecture.update({
"total_parameters": total_params,
"trainable_parameters": trainable_params,
"model_size_gb": total_params * 2 / 1e9, # Approximate for float16
})
# Layer breakdown
layer_info = {}
for name, module in self.apertus.model.named_modules():
if hasattr(module, 'weight') and len(list(module.parameters())) > 0:
params = sum(p.numel() for p in module.parameters())
layer_info[name] = {
"parameters": params,
"shape": list(module.weight.shape) if hasattr(module, 'weight') else None,
"dtype": str(module.weight.dtype) if hasattr(module, 'weight') else None
}
architecture["layer_breakdown"] = layer_info
# Print summary
print("🏗️ APERTUS ARCHITECTURE ANALYSIS")
print("=" * 60)
print(f"Model Type: {architecture['model_type']}")
print(f"Layers: {architecture['num_hidden_layers']}")
print(f"Attention Heads: {architecture['num_attention_heads']}")
print(f"Hidden Size: {architecture['hidden_size']}")
print(f"Vocabulary: {architecture['vocab_size']:,} tokens")
print(f"Total Parameters: {total_params:,}")
print(f"Model Size: ~{architecture['model_size_gb']:.2f} GB")
return architecture
def visualize_attention_patterns(
self,
text: str,
layer: int = 15,
head: Optional[int] = None,
save_path: Optional[str] = None
) -> Tuple[np.ndarray, List[str]]:
"""
Visualize attention patterns for given text
Args:
text: Input text to analyze
layer: Which transformer layer to analyze (0 to num_layers-1)
head: Specific attention head (None for average across heads)
save_path: Optional path to save visualization
Returns:
Tuple of (attention_matrix, tokens)
"""
logger.info(f"🎯 Analyzing attention patterns for: '{text}'")
# Tokenize input
inputs = self.apertus.tokenizer(text, return_tensors="pt")
tokens = self.apertus.tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
# Move inputs to model device
device = next(self.apertus.model.parameters()).device
inputs = {k: v.to(device) for k, v in inputs.items()}
# Get model outputs with attention
with torch.no_grad():
outputs = self.apertus.model(**inputs, output_attentions=True)
# Extract attention weights
if layer >= len(outputs.attentions):
layer = len(outputs.attentions) - 1
logger.warning(f"Layer {layer} not available, using layer {len(outputs.attentions) - 1}")
attention_weights = outputs.attentions[layer][0] # [num_heads, seq_len, seq_len]
# Average across heads or select specific head
if head is None:
attention_matrix = attention_weights.mean(dim=0).cpu().numpy()
title_suffix = f"Layer {layer} (All Heads Average)"
else:
if head >= attention_weights.shape[0]:
head = 0
logger.warning(f"Head {head} not available, using head 0")
attention_matrix = attention_weights[head].cpu().numpy()
title_suffix = f"Layer {layer}, Head {head}"
# Create visualization
plt.figure(figsize=(12, 10))
# Create heatmap
sns.heatmap(
attention_matrix,
xticklabels=tokens,
yticklabels=tokens,
cmap='Blues',
cbar_kws={'label': 'Attention Weight'},
square=True
)
plt.title(f'Attention Patterns - {title_suffix}')
plt.xlabel('Key Tokens (what it looks at)')
plt.ylabel('Query Tokens (what is looking)')
plt.xticks(rotation=45, ha='right')
plt.yticks(rotation=0)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
logger.info(f"Attention visualization saved to {save_path}")
plt.show()
# Print attention insights
print(f"\n🔍 ATTENTION INSIGHTS FOR: '{text}'")
print("=" * 60)
print(f"Attention Matrix Shape: {attention_matrix.shape}")
print(f"Max Attention Weight: {attention_matrix.max():.4f}")
print(f"Average Attention Weight: {attention_matrix.mean():.4f}")
print(f"Attention Spread (std): {attention_matrix.std():.4f}")
# Show top attention patterns
print("\n🎯 TOP ATTENTION PATTERNS:")
for i, token in enumerate(tokens[:min(5, len(tokens))]):
if i < attention_matrix.shape[0]:
top_attention_idx = attention_matrix[i].argmax()
top_attention_token = tokens[top_attention_idx] if top_attention_idx < len(tokens) else "N/A"
attention_score = attention_matrix[i][top_attention_idx]
print(f" '{token}' → '{top_attention_token}' ({attention_score:.3f})")
return attention_matrix, tokens
def trace_hidden_states(
self,
text: str,
analyze_layers: Optional[List[int]] = None
) -> Dict[int, Dict[str, Any]]:
"""
Track evolution of hidden states through model layers
Args:
text: Input text to analyze
analyze_layers: Specific layers to analyze (None for key layers)
Returns:
Dictionary mapping layer indices to analysis results
"""
logger.info(f"🧠 Tracing hidden state evolution for: '{text}'")
# Default to key layers if none specified
if analyze_layers is None:
num_layers = self.apertus.model.config.num_hidden_layers
analyze_layers = [0, num_layers//4, num_layers//2, 3*num_layers//4, num_layers-1]
# Tokenize input
inputs = self.apertus.tokenizer(text, return_tensors="pt")
tokens = self.apertus.tokenizer.convert_ids_to_tokens(inputs['input_ids'][0])
# Move inputs to model device
device = next(self.apertus.model.parameters()).device
inputs = {k: v.to(device) for k, v in inputs.items()}
# Get hidden states
with torch.no_grad():
outputs = self.apertus.model(**inputs, output_hidden_states=True)
hidden_states = outputs.hidden_states
layer_analysis = {}
print(f"\n🔄 HIDDEN STATE EVOLUTION FOR: '{text}'")
print("=" * 60)
for layer_idx in analyze_layers:
if layer_idx >= len(hidden_states):
continue
layer_states = hidden_states[layer_idx][0] # Remove batch dimension
# Calculate statistics for each token
token_stats = []
for i, token in enumerate(tokens):
if i < layer_states.shape[0]:
token_vector = layer_states[i].cpu().numpy()
stats = {
'token': token,
'mean_activation': np.mean(token_vector),
'std_activation': np.std(token_vector),
'max_activation': np.max(token_vector),
'min_activation': np.min(token_vector),
'l2_norm': np.linalg.norm(token_vector),
'activation_range': np.max(token_vector) - np.min(token_vector)
}
token_stats.append(stats)
# Layer-level statistics
layer_stats = {
'avg_l2_norm': np.mean([s['l2_norm'] for s in token_stats]),
'max_l2_norm': np.max([s['l2_norm'] for s in token_stats]),
'avg_activation': np.mean([s['mean_activation'] for s in token_stats]),
'activation_spread': np.std([s['mean_activation'] for s in token_stats])
}
layer_analysis[layer_idx] = {
'token_stats': token_stats,
'layer_stats': layer_stats,
'hidden_state_shape': layer_states.shape
}
# Print layer summary
print(f"\nLayer {layer_idx}:")
print(f" Hidden State Shape: {layer_states.shape}")
print(f" Average L2 Norm: {layer_stats['avg_l2_norm']:.4f}")
print(f" Peak L2 Norm: {layer_stats['max_l2_norm']:.4f}")
print(f" Average Activation: {layer_stats['avg_activation']:.4f}")
# Show strongest tokens
sorted_tokens = sorted(token_stats, key=lambda x: x['l2_norm'], reverse=True)
print(f" Strongest Tokens:")
for i, stats in enumerate(sorted_tokens[:3]):
print(f" {i+1}. '{stats['token']}' (L2: {stats['l2_norm']:.4f})")
# Visualize evolution
self._plot_hidden_state_evolution(layer_analysis, analyze_layers, tokens)
return layer_analysis
def _plot_hidden_state_evolution(
self,
layer_analysis: Dict[int, Dict[str, Any]],
layers: List[int],
tokens: List[str]
):
"""Plot hidden state evolution across layers"""
plt.figure(figsize=(14, 8))
# Plot 1: Average L2 norms across layers
plt.subplot(2, 2, 1)
avg_norms = [layer_analysis[layer]['layer_stats']['avg_l2_norm'] for layer in layers]
plt.plot(layers, avg_norms, 'bo-', linewidth=2, markersize=8)
plt.xlabel('Layer')
plt.ylabel('Average L2 Norm')
plt.title('Representation Strength Evolution')
plt.grid(True, alpha=0.3)
# Plot 2: Token-specific evolution (first 5 tokens)
plt.subplot(2, 2, 2)
for token_idx in range(min(5, len(tokens))):
token_norms = []
for layer in layers:
if token_idx < len(layer_analysis[layer]['token_stats']):
norm = layer_analysis[layer]['token_stats'][token_idx]['l2_norm']
token_norms.append(norm)
else:
token_norms.append(0)
plt.plot(layers, token_norms, 'o-', label=f"'{tokens[token_idx]}'", linewidth=1.5)
plt.xlabel('Layer')
plt.ylabel('L2 Norm')
plt.title('Token-Specific Evolution')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.grid(True, alpha=0.3)
# Plot 3: Activation spread
plt.subplot(2, 2, 3)
spreads = [layer_analysis[layer]['layer_stats']['activation_spread'] for layer in layers]
plt.plot(layers, spreads, 'ro-', linewidth=2, markersize=8)
plt.xlabel('Layer')
plt.ylabel('Activation Spread (std)')
plt.title('Representation Diversity')
plt.grid(True, alpha=0.3)
# Plot 4: Peak vs Average activations
plt.subplot(2, 2, 4)
avg_norms = [layer_analysis[layer]['layer_stats']['avg_l2_norm'] for layer in layers]
max_norms = [layer_analysis[layer]['layer_stats']['max_l2_norm'] for layer in layers]
plt.plot(layers, avg_norms, 'bo-', label='Average', linewidth=2)
plt.plot(layers, max_norms, 'ro-', label='Peak', linewidth=2)
plt.xlabel('Layer')
plt.ylabel('L2 Norm')
plt.title('Peak vs Average Activations')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def analyze_token_predictions(
self,
prompt: str,
max_new_tokens: int = 5,
temperature: float = 0.7,
show_top_k: int = 10
) -> List[Dict[str, Any]]:
"""
Analyze step-by-step token prediction process
Args:
prompt: Initial prompt
max_new_tokens: Number of tokens to generate and analyze
temperature: Sampling temperature
show_top_k: Number of top candidates to show for each step
Returns:
List of prediction steps with probabilities and selections
"""
logger.info(f"🎲 Analyzing token predictions for: '{prompt}'")
print(f"\n🎲 TOKEN PREDICTION ANALYSIS")
print("=" * 60)
print(f"Prompt: '{prompt}'")
print(f"Temperature: {temperature}")
# Encode initial prompt
input_ids = self.apertus.tokenizer.encode(prompt, return_tensors="pt")
generation_steps = []
for step in range(max_new_tokens):
print(f"\n--- STEP {step + 1} ---")
# Get model predictions
with torch.no_grad():
outputs = self.apertus.model(input_ids)
logits = outputs.logits[0, -1, :] # Last token's predictions
# Apply temperature and convert to probabilities
scaled_logits = logits / temperature
probabilities = torch.nn.functional.softmax(scaled_logits, dim=-1)
# Get top candidates
top_probs, top_indices = torch.topk(probabilities, show_top_k)
# Create step data
step_data = {
'step': step + 1,
'current_text': self.apertus.tokenizer.decode(input_ids[0]),
'candidates': [],
'logits_stats': {
'max_logit': logits.max().item(),
'min_logit': logits.min().item(),
'mean_logit': logits.mean().item(),
'std_logit': logits.std().item()
}
}
print(f"Current text: '{step_data['current_text']}'")
print(f"\nTop {show_top_k} Token Candidates:")
for i in range(show_top_k):
token_id = top_indices[i].item()
token = self.apertus.tokenizer.decode([token_id])
prob = top_probs[i].item()
logit = logits[token_id].item()
candidate = {
'rank': i + 1,
'token': token,
'token_id': token_id,
'probability': prob,
'logit': logit
}
step_data['candidates'].append(candidate)
# Visual indicators for probability ranges
if prob > 0.3:
indicator = "🔥" # High confidence
elif prob > 0.1:
indicator = "✅" # Medium confidence
elif prob > 0.05:
indicator = "⚠️" # Low confidence
else:
indicator = "❓" # Very low confidence
print(f" {i+1:2d}. '{token}' - {prob:.1%} (logit: {logit:.2f}) {indicator}")
# Sample next token
next_token_id = torch.multinomial(probabilities, 1)
next_token = self.apertus.tokenizer.decode([next_token_id.item()])
# Find rank of selected token
selected_rank = "N/A"
if next_token_id in top_indices:
selected_rank = (top_indices == next_token_id).nonzero().item() + 1
step_data['selected_token'] = next_token
step_data['selected_token_id'] = next_token_id.item()
step_data['selected_rank'] = selected_rank
print(f"\n🎯 SELECTED: '{next_token}' (rank: {selected_rank})")
generation_steps.append(step_data)
# Update input for next iteration
input_ids = torch.cat([input_ids, next_token_id.unsqueeze(0)], dim=-1)
# Final result
final_text = self.apertus.tokenizer.decode(input_ids[0])
print(f"\n✨ FINAL GENERATED TEXT: '{final_text}'")
return generation_steps
def weight_analysis(
self,
layer_name: str = "model.layers.15.self_attn.q_proj",
sample_size: int = 100
) -> Optional[np.ndarray]:
"""
Analyze specific layer weights
Args:
layer_name: Name of the layer to analyze
sample_size: Size of sample for visualization
Returns:
Weight matrix if successful, None if layer not found
"""
logger.info(f"⚖️ Analyzing weights for layer: {layer_name}")
print(f"\n⚖️ WEIGHT ANALYSIS: {layer_name}")
print("=" * 60)
try:
# Get the specified layer
layer = dict(self.apertus.model.named_modules())[layer_name]
weights = layer.weight.data.cpu().numpy()
print(f"Weight Matrix Shape: {weights.shape}")
print(f"Weight Statistics:")
print(f" Mean: {np.mean(weights):.6f}")
print(f" Std: {np.std(weights):.6f}")
print(f" Min: {np.min(weights):.6f}")
print(f" Max: {np.max(weights):.6f}")
print(f" Total Parameters: {weights.size:,}")
print(f" Memory Usage: {weights.nbytes / 1024**2:.2f} MB")
# Create visualizations
self._plot_weight_analysis(weights, layer_name, sample_size)
return weights
except KeyError:
print(f"❌ Layer '{layer_name}' not found!")
print("\n📋 Available layers:")
for name, module in self.apertus.model.named_modules():
if hasattr(module, 'weight'):
print(f" {name}")
return None
def _plot_weight_analysis(
self,
weights: np.ndarray,
layer_name: str,
sample_size: int
):
"""Plot weight analysis visualizations"""
plt.figure(figsize=(15, 10))
# Plot 1: Weight distribution
plt.subplot(2, 3, 1)
plt.hist(weights.flatten(), bins=50, alpha=0.7, edgecolor='black', color='skyblue')
plt.title(f'Weight Distribution\n{layer_name}')
plt.xlabel('Weight Value')
plt.ylabel('Frequency')
plt.grid(True, alpha=0.3)
# Plot 2: Weight matrix heatmap (sample)
plt.subplot(2, 3, 2)
if len(weights.shape) > 1:
sample_weights = weights[:sample_size, :sample_size]
else:
sample_weights = weights[:sample_size].reshape(-1, 1)
plt.imshow(sample_weights, cmap='RdBu', vmin=-0.1, vmax=0.1, aspect='auto')
plt.title(f'Weight Matrix Sample\n({sample_size}x{sample_size})')
plt.colorbar(label='Weight Value')
# Plot 3: Row-wise statistics
plt.subplot(2, 3, 3)
if len(weights.shape) > 1:
row_means = np.mean(weights, axis=1)
row_stds = np.std(weights, axis=1)
plt.plot(row_means, label='Row Means', alpha=0.7)
plt.plot(row_stds, label='Row Stds', alpha=0.7)
plt.title('Row-wise Statistics')
plt.xlabel('Row Index')
plt.ylabel('Value')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot 4: Weight magnitude distribution
plt.subplot(2, 3, 4)
weight_magnitudes = np.abs(weights.flatten())
plt.hist(weight_magnitudes, bins=50, alpha=0.7, edgecolor='black', color='lightcoral')
plt.title('Weight Magnitude Distribution')
plt.xlabel('|Weight Value|')
plt.ylabel('Frequency')
plt.grid(True, alpha=0.3)
# Plot 5: Sparsity analysis
plt.subplot(2, 3, 5)
threshold_range = np.logspace(-4, -1, 20)
sparsity_ratios = []
for threshold in threshold_range:
sparse_ratio = np.mean(np.abs(weights) < threshold)
sparsity_ratios.append(sparse_ratio)
plt.semilogx(threshold_range, sparsity_ratios, 'o-', linewidth=2)
plt.title('Sparsity Analysis')
plt.xlabel('Threshold')
plt.ylabel('Fraction of Weights Below Threshold')
plt.grid(True, alpha=0.3)
# Plot 6: Weight norm by layer section
plt.subplot(2, 3, 6)
if len(weights.shape) > 1:
section_size = max(1, weights.shape[0] // 20)
section_norms = []
section_labels = []
for i in range(0, weights.shape[0], section_size):
end_idx = min(i + section_size, weights.shape[0])
section = weights[i:end_idx]
section_norm = np.linalg.norm(section)
section_norms.append(section_norm)
section_labels.append(f"{i}-{end_idx}")
plt.bar(range(len(section_norms)), section_norms, alpha=0.7, color='lightgreen')
plt.title('Section-wise L2 Norms')
plt.xlabel('Weight Section')
plt.ylabel('L2 Norm')
plt.xticks(range(0, len(section_labels), max(1, len(section_labels)//5)))
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def get_available_layers(self) -> Dict[str, List[str]]:
"""
Get list of all available layers for analysis
Returns:
Dictionary organizing layers by type
"""
layers = {
"attention": [],
"mlp": [],
"embedding": [],
"norm": [],
"other": []
}
for name, module in self.apertus.model.named_modules():
if hasattr(module, 'weight'):
if 'attn' in name:
layers["attention"].append(name)
elif 'mlp' in name or 'feed_forward' in name:
layers["mlp"].append(name)
elif 'embed' in name:
layers["embedding"].append(name)
elif 'norm' in name or 'layer_norm' in name:
layers["norm"].append(name)
else:
layers["other"].append(name)
return layers