GitHub Actions
Clean sync from GitHub - no large files in history
aca8ab4

A newer version of the Gradio SDK is available: 6.2.0

Upgrade

AGENTS.md

A Technical Deep-Dive into Multi-Agent Architecture

This document provides a comprehensive technical reference for understanding, building, and debugging agents in the Multi-Agent Research Paper Analysis System. It focuses on agent design patterns, state transformations, error handling, observability, and extensibility.


Table of Contents

  1. Introduction

  2. Agent Architecture Fundamentals

  3. Individual Agent Deep Dives

  4. Cross-Cutting Patterns

  5. Workflow Orchestration

  6. Building New Agents

  7. Agent Comparison Reference

  8. Troubleshooting and Debugging


1. Introduction

The 4-Agent Sequential Pipeline

The Multi-Agent Research Paper Analysis System implements a sequential pipeline of four specialized agents orchestrated by LangGraph:

User Query β†’ Retriever β†’ Analyzer β†’ Filter β†’ Synthesis β†’ Citation β†’ Output
                ↓          ↓         ↓         ↓           ↓
            [LangFuse Tracing for All Nodes]

Each agent:

  • Operates on a shared state dictionary that flows through the pipeline
  • Performs a specialized task (retrieval, analysis, synthesis, citation)
  • Transforms the state by reading inputs and writing outputs
  • Never blocks the workflow - returns partial results on failure
  • Is automatically traced by LangFuse for observability

Agent Design Philosophy

The architecture follows these core principles:

1. Pure Functions, Not Stateful Services

  • Agents are pure functions: run(state) -> state
  • No instance state between invocations
  • Deterministic outputs for same inputs (temperature=0)

2. Resilience Through Graceful Degradation

  • Never raise exceptions from run()
  • Return partial results with degraded confidence scores
  • Append errors to state for debugging
  • Circuit breakers prevent cascading failures

3. Observability by Design

  • All agents decorated with @observe for automatic tracing
  • Three-tier tracing: node-level, agent-level, LLM-level
  • Session IDs track multi-turn conversations
  • Token usage accumulated for cost monitoring

4. Separation of Concerns

  • Agent logic: Domain-specific transformations
  • Node wrappers: Orchestration concerns (tracing, error handling, logging)
  • Workflow graph: Routing and conditional execution

5. Explicit Contracts

  • Pydantic schemas validate all data structures
  • AgentState TypedDict defines state shape
  • msgpack serialization constraints enforced

How Agents Differ from Traditional Microservices

Aspect Traditional Microservices Our Agents
Communication HTTP/gRPC between services Shared state dictionary
State Each service has database Stateless, state flows through pipeline
Failure Handling Retry with exponential backoff Graceful degradation with partial results
Orchestration Service mesh, API gateway LangGraph with conditional routing
Observability Distributed tracing (Jaeger, Zipkin) LangFuse with automatic instrumentation
Deployment Independent containers Single process, modular architecture
Scaling Horizontal scaling Parallel processing within agents (ThreadPoolExecutor)

Key Insight: Agents are lightweight, composable functions orchestrated by a workflow graph, not heavyweight network services.


2. Agent Architecture Fundamentals

The Common Agent Interface

All agents implement a consistent interface:

from typing import Dict, Any

class BaseAgent:
    """Base interface for all agents in the system."""

    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform the workflow state.

        Args:
            state: Current workflow state (AgentState TypedDict)

        Returns:
            Updated state with new keys/values added

        Raises:
            Never raises - catches all exceptions and appends to state["errors"]
        """
        raise NotImplementedError

Critical Contract Rules:

  1. Never Mutate State In-Place: Always return a new/modified dictionary
  2. Never Raise Exceptions: Catch all exceptions, append to state["errors"]
  3. Always Return State: Even on failure, return state with partial results
  4. Use Pydantic Models: Validate outputs before adding to state

State Transformation Contract

Agents follow a clear input/output pattern:

# Input: Read specific keys from state
query = state.get("query")
papers = state.get("papers", [])
category = state.get("category")

# Processing: Transform data using dependencies
results = self.process(query, papers)

# Output: Write new keys to state (never overwrite critical keys)
state["analyses"] = results
state["token_usage"]["input_tokens"] += prompt_tokens
state["errors"].append(error_message)  # Only on error

# Return: Modified state
return state

State Flow Example:

# Initial state (from user input)
{
    "query": "What are recent advances in transformer architectures?",
    "category": "cs.AI",
    "num_papers": 5,
    "errors": [],
    "token_usage": {"input_tokens": 0, "output_tokens": 0, "embedding_tokens": 0}
}

# After RetrieverAgent
{
    # ... original keys ...
    "papers": [Paper(...), Paper(...), ...],  # NEW
    "chunks": [PaperChunk(...), ...],          # NEW
    "token_usage": {"embedding_tokens": 15000} # UPDATED
}

# After AnalyzerAgent
{
    # ... all previous keys ...
    "analyses": [Analysis(...), Analysis(...), ...],  # NEW
    "token_usage": {
        "input_tokens": 12000,   # UPDATED
        "output_tokens": 3000,   # UPDATED
        "embedding_tokens": 15000
    }
}

# And so on through the pipeline...

Dependency Injection Pattern

Agents receive their dependencies via constructor injection:

# agents/analyzer.py
class AnalyzerAgent:
    """Analyzes individual papers using RAG."""

    def __init__(
        self,
        rag_retriever,           # Injected dependency
        azure_openai_config: Dict[str, str],
        max_workers: int = 4,
        timeout: int = 60
    ):
        self.rag_retriever = rag_retriever
        self.client = self._initialize_client(azure_openai_config)
        self.max_workers = max_workers
        self.timeout = timeout
        self.consecutive_failures = 0
        self.max_consecutive_failures = 2
        self.token_lock = threading.Lock()

Benefits:

  • Testability: Easy to mock dependencies in tests
  • Flexibility: Different implementations can be injected (e.g., ArxivClient vs MCPArxivClient)
  • Clarity: Dependencies are explicit in constructor signature

Initialization in app.py:

# app.py:298-345
rag_retriever = RAGRetriever(vector_store=vector_store, embedding_generator=embedding_generator)
analyzer_agent = AnalyzerAgent(rag_retriever=rag_retriever, azure_openai_config=azure_config)
synthesis_agent = SynthesisAgent(rag_retriever=rag_retriever, azure_openai_config=azure_config)
citation_agent = CitationAgent(rag_retriever=rag_retriever)

LangGraph Integration Through Node Wrappers

Agents integrate with LangGraph through a node wrapper pattern:

# orchestration/nodes.py
from langfuse.decorators import observe

@observe(name="analyzer_agent", as_type="span")
def analyzer_node(state: AgentState, analyzer_agent) -> AgentState:
    """
    Node wrapper for AnalyzerAgent.

    Responsibilities:
    - LangFuse tracing (via @observe decorator)
    - Structured logging
    - Error handling (catch exceptions)
    - State transformation delegation
    """
    logger.info("Starting analyzer agent...")

    try:
        # Delegate to agent's run() method
        updated_state = analyzer_agent.run(state)

        logger.info(f"Analyzer completed. Analyses: {len(updated_state.get('analyses', []))}")
        return updated_state

    except Exception as e:
        logger.error(f"Analyzer node failed: {str(e)}", exc_info=True)
        state["errors"].append(f"Analyzer failed: {str(e)}")
        return state

Workflow Graph Definition:

# orchestration/workflow_graph.py:75-88
from langgraph.graph import StateGraph, END

workflow = StateGraph(AgentState)

# Add nodes (lambda binds agent instance to node wrapper)
workflow.add_node("retriever", lambda state: retriever_node(state, retriever_agent))
workflow.add_node("analyzer", lambda state: analyzer_node(state, analyzer_agent))
workflow.add_node("filter", filter_node)
workflow.add_node("synthesis", lambda state: synthesis_node(state, synthesis_agent))
workflow.add_node("citation", lambda state: citation_node(state, citation_agent))

# Define edges (execution flow)
workflow.set_entry_point("retriever")
workflow.add_edge("analyzer", "filter")
workflow.add_edge("synthesis", "citation")
workflow.add_edge("citation", END)

Why Node Wrappers?

  1. Separation of Concerns: Agent logic stays pure, orchestration concerns in wrapper
  2. Automatic Tracing: @observe decorator applies to all agents uniformly
  3. Centralized Error Handling: Catch-all exception handling prevents workflow crashes
  4. Consistent Logging: Structured logs with same format across all agents

3. Individual Agent Deep Dives

RetrieverAgent

File: agents/retriever.py

Core Responsibilities:

  1. Search arXiv for papers matching user query and category
  2. Download PDFs via configurable clients (Direct API, Legacy MCP, FastMCP)
  3. Process PDFs into 500-token chunks with 50-token overlap
  4. Generate embeddings using Azure OpenAI text-embedding-3-small
  5. Store chunks in ChromaDB vector database

State Transformations:

# Input Keys
query = state.get("query")           # str: "What are recent advances in transformers?"
category = state.get("category")     # Optional[str]: "cs.AI"
num_papers = state.get("num_papers", 5)  # int: 5

# Output Keys (added to state)
state["papers"] = [Paper(...), ...]        # List[Paper]: Paper metadata
state["chunks"] = [PaperChunk(...), ...]   # List[PaperChunk]: Text chunks
state["token_usage"]["embedding_tokens"] = 15000  # Estimated tokens
state["errors"].append("Failed to download paper X")  # On partial failure

Dependencies:

def __init__(
    self,
    arxiv_client,        # ArxivClient | MCPArxivClient | FastMCPArxivClient
    pdf_processor,       # PDFProcessor
    embedding_generator, # EmbeddingGenerator
    vector_store,        # VectorStore (ChromaDB)
    fallback_client=None # Optional fallback client
):

Key Design Pattern: Two-Tier Fallback

# agents/retriever.py:69-97
def _search_with_fallback(
    self,
    query: str,
    max_results: int,
    category: Optional[str] = None
) -> List[Paper]:
    """Search with automatic fallback to secondary client."""

    # Try primary client (e.g., FastMCP)
    try:
        logger.info(f"Searching with primary client: {type(self.arxiv_client).__name__}")
        papers = self.arxiv_client.search_papers(
            query=query,
            max_results=max_results,
            category=category
        )
        if papers:
            return papers
        logger.warning("Primary client returned no results, trying fallback...")

    except Exception as e:
        logger.warning(f"Primary client failed: {str(e)}, trying fallback...")

    # Fallback to secondary client (e.g., Direct API)
    if self.fallback_client:
        try:
            logger.info(f"Searching with fallback client: {type(self.fallback_client).__name__}")
            return self.fallback_client.search_papers(
                query=query,
                max_results=max_results,
                category=category
            )
        except Exception as e:
            logger.error(f"Fallback client also failed: {str(e)}")
            return []

    return []

Why This Pattern?

  • Resilience: MCP servers may be unavailable, fallback ensures retrieval succeeds
  • Transparency: Logs show which client succeeded
  • Zero User Impact: Fallback is automatic and invisible

Key Design Pattern: Data Validation Filtering

# agents/retriever.py:198-242
def _validate_papers(self, papers: List[Paper]) -> List[Paper]:
    """Validate and filter papers to ensure Pydantic compliance."""

    valid_papers = []
    for paper in papers:
        try:
            # Ensure all list fields are actually lists
            if not isinstance(paper.authors, list):
                paper.authors = [paper.authors] if paper.authors else []
            if not isinstance(paper.categories, list):
                paper.categories = [paper.categories] if paper.categories else []

            # Re-validate with Pydantic
            validated_paper = Paper(**paper.model_dump())
            valid_papers.append(validated_paper)

        except Exception as e:
            logger.warning(f"Skipping invalid paper {paper.arxiv_id}: {str(e)}")
            continue

    logger.info(f"Validated {len(valid_papers)}/{len(papers)} papers")
    return valid_papers

Why This Pattern?

  • Defensive Programming: MCP servers may return malformed data
  • Partial Success: Continue with valid papers instead of failing completely
  • Type Safety: Ensures downstream agents can rely on Pydantic schemas

Error Handling Strategy:

# agents/retriever.py:249-302
@observe(name="retriever_agent_run", as_type="generation")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # Step 1: Search (with fallback)
        papers = self._search_with_fallback(query, max_results, category)
        if not papers:
            state["errors"].append("No papers found for query")
            return state  # Early return, no exception

        # Step 2: Download PDFs (continue on partial failures)
        for paper in papers:
            try:
                pdf_path = self._download_with_fallback(paper)
                # Process PDF...
            except Exception as e:
                logger.warning(f"Failed to process {paper.arxiv_id}: {str(e)}")
                continue  # Skip this paper, process others

        # Step 3: Generate embeddings (batch operation)
        try:
            embeddings = self.embedding_generator.generate_batch(chunks)
        except Exception as e:
            logger.error(f"Embedding generation failed: {str(e)}")
            state["errors"].append("Embedding generation failed")
            return state  # Return papers/chunks without embeddings

        # Success: Return enriched state
        state["papers"] = papers
        state["chunks"] = chunks
        state["token_usage"]["embedding_tokens"] = len(chunks) * 300
        return state

    except Exception as e:
        logger.error(f"Retriever agent failed: {str(e)}", exc_info=True)
        state["errors"].append(f"Retriever failed: {str(e)}")
        return state  # Never raise

Observability Integration:

@observe(name="retriever_agent_run", as_type="generation")
  • Type: "generation" (includes embedding generation)
  • Trace Data: Search query, paper count, chunk count, embedding tokens
  • LangFuse View: Shows retrieval duration, embedding API calls

Critical File Paths:

  • agents/retriever.py:69-97 - Fallback search logic
  • agents/retriever.py:100-157 - Fallback download logic
  • agents/retriever.py:198-242 - Paper validation
  • agents/retriever.py:249-302 - Main run() method

AnalyzerAgent

File: agents/analyzer.py

Core Responsibilities:

  1. Analyze each paper individually using RAG context
  2. Execute 4 broad queries per paper for comprehensive coverage
  3. Call Azure OpenAI (GPT-4o-mini) with temperature=0 for deterministic JSON
  4. Extract methodology, findings, conclusions, limitations, contributions
  5. Calculate confidence scores based on context completeness

State Transformations:

# Input Keys
papers = state.get("papers", [])  # List[Paper] from RetrieverAgent

# Output Keys (added to state)
state["analyses"] = [Analysis(...), ...]  # List[Analysis]: One per paper
state["token_usage"]["input_tokens"] += 12000   # Cumulative prompt tokens
state["token_usage"]["output_tokens"] += 3000   # Cumulative completion tokens
state["errors"].append("Failed to analyze paper X")  # On failure

Dependencies:

def __init__(
    self,
    rag_retriever,       # RAGRetriever: Semantic search + context formatting
    azure_openai_config: Dict[str, str],
    max_workers: int = 4,   # Parallel analysis threads
    timeout: int = 60       # LLM call timeout
):

Key Design Pattern: Parallel Processing with Circuit Breaker

# agents/analyzer.py:333-359
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    papers = state.get("papers", [])
    analyses = []

    # Reset circuit breaker
    self.consecutive_failures = 0

    # Parallel processing with ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        future_to_paper = {
            executor.submit(self.analyze_paper, paper): paper
            for paper in papers
        }

        for future in as_completed(future_to_paper):
            # Circuit breaker check
            if self.consecutive_failures >= self.max_consecutive_failures:
                logger.error(f"Circuit breaker triggered after {self.consecutive_failures} failures")
                break

            paper = future_to_paper[future]
            try:
                analysis = future.result()

                if analysis.confidence_score > 0:
                    analyses.append(analysis)
                    self.consecutive_failures = 0  # Reset on success
                else:
                    self.consecutive_failures += 1

            except Exception as e:
                logger.error(f"Analysis failed for {paper.arxiv_id}: {str(e)}")
                self.consecutive_failures += 1

    state["analyses"] = analyses
    return state

Why This Pattern?

  • Throughput: Analyzes 4 papers concurrently (max_workers=4)
  • Circuit Breaker: Stops after 2 consecutive failures (prevents wasted API calls)
  • Thread Safety: self.token_lock protects shared token counter
  • Graceful Degradation: Partial analyses returned even if some papers fail

Key Design Pattern: Comprehensive RAG Queries

# agents/analyzer.py:208-252
def _retrieve_comprehensive_context(self, paper: Paper, top_k: int = 10) -> Tuple[str, List[str]]:
    """
    Retrieve chunks using multiple broad queries to ensure full coverage.
    """
    # 4 broad queries to cover different aspects
    queries = [
        "methodology approach methods experimental setup techniques",
        "results findings data experiments performance evaluation",
        "conclusions contributions implications significance impact",
        "limitations future work challenges open problems directions"
    ]

    all_chunks = []
    all_chunk_ids = []

    # Retrieve top_k/4 chunks per query (10 total chunks by default)
    chunks_per_query = max(1, top_k // len(queries))

    for query in queries:
        result = self.rag_retriever.retrieve(
            query=query,
            top_k=chunks_per_query,
            paper_ids=[paper.arxiv_id]  # Filter to this paper only
        )
        all_chunks.extend(result["chunks"])
        all_chunk_ids.extend(result["chunk_ids"])

    # Deduplicate by chunk_id
    seen = set()
    unique_chunks = []
    unique_ids = []

    for chunk, chunk_id in zip(all_chunks, all_chunk_ids):
        if chunk_id not in seen:
            seen.add(chunk_id)
            unique_chunks.append(chunk)
            unique_ids.append(chunk_id)

    # Format context with metadata
    context = self.rag_retriever.format_context(unique_chunks)
    return context, unique_ids

Why This Pattern?

  • Comprehensive Coverage: Single query misses sections (e.g., "methods" misses conclusions)
  • Semantic Diversity: Broad queries capture different aspects of the paper
  • Deduplication: Prevents redundant chunks from multiple queries
  • Filtered Search: paper_ids ensures we only retrieve from current paper

Key Design Pattern: LLM Response Normalization

# agents/analyzer.py:107-178
def _normalize_analysis_response(self, data: dict) -> dict:
    """
    Normalize malformed LLM responses to match Pydantic schema.

    Common issues:
    - Nested lists: ["finding 1", ["finding 2", "finding 3"]]
    - None values in lists: [None, "valid finding"]
    - Mixed types: [123, "text", {"key": "value"}]
    """
    def flatten_and_clean(value):
        """Recursively flatten nested lists and convert to strings."""
        if value is None:
            return ""
        elif isinstance(value, list):
            flattened = []
            for item in value:
                cleaned = flatten_and_clean(item)
                if isinstance(cleaned, list):
                    flattened.extend(cleaned)
                elif cleaned:  # Skip empty strings
                    flattened.append(cleaned)
            return flattened
        elif isinstance(value, (dict, int, float, bool)):
            return str(value)
        else:
            return str(value)

    # Normalize all list fields
    normalized = {}
    list_fields = ["methodology", "key_findings", "conclusions", "limitations", "contributions"]

    for field in list_fields:
        if field in data:
            cleaned = flatten_and_clean(data[field])
            normalized[field] = cleaned if isinstance(cleaned, list) else [cleaned]
        else:
            normalized[field] = []

    # Preserve scalar fields
    normalized["confidence_score"] = float(data.get("confidence_score", 0.0))
    normalized["arxiv_id"] = data.get("arxiv_id", "")
    normalized["title"] = data.get("title", "")

    return normalized

Why This Pattern?

  • LLM Hallucinations: GPT-4o-mini occasionally returns malformed JSON
  • Defensive Parsing: Prevents Pydantic validation errors
  • Data Salvage: Extracts valid data even from malformed responses

Error Handling Strategy:

# agents/analyzer.py:260-325
def analyze_paper(self, paper: Paper) -> Analysis:
    """Analyze a single paper (called by ThreadPoolExecutor)."""
    try:
        # Step 1: Retrieve context via RAG
        context, chunk_ids = self._retrieve_comprehensive_context(paper)

        # Step 2: Call LLM with structured prompt
        response = self.client.chat.completions.create(
            model=self.deployment_name,
            messages=[
                {"role": "system", "content": "You are a research paper analyzer..."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.0,  # Deterministic
            response_format={"type": "json_object"},  # Force JSON
            max_tokens=2000,
            timeout=self.timeout
        )

        # Step 3: Parse and normalize response
        data = json.loads(response.choices[0].message.content)
        normalized = self._normalize_analysis_response(data)

        # Step 4: Create Pydantic model
        analysis = Analysis(**normalized)

        # Step 5: Track tokens (thread-safe)
        with self.token_lock:
            self.total_input_tokens += response.usage.prompt_tokens
            self.total_output_tokens += response.usage.completion_tokens

        return analysis

    except Exception as e:
        logger.error(f"Failed to analyze {paper.arxiv_id}: {str(e)}", exc_info=True)
        # Return minimal analysis with confidence=0.0
        return Analysis(
            arxiv_id=paper.arxiv_id,
            title=paper.title,
            methodology=[],
            key_findings=[],
            conclusions=[],
            limitations=[],
            contributions=[],
            confidence_score=0.0
        )

Observability Integration:

@observe(name="analyzer_agent_run", as_type="generation")
  • Type: "generation" (LLM-heavy workload)
  • Trace Data: Paper count, analysis count, token usage, parallel execution
  • LangFuse View: Shows individual LLM calls via langfuse-openai instrumentation

Critical File Paths:

  • agents/analyzer.py:107-178 - Response normalization
  • agents/analyzer.py:208-252 - Comprehensive RAG queries
  • agents/analyzer.py:260-325 - Single paper analysis
  • agents/analyzer.py:333-359 - Parallel processing with circuit breaker

SynthesisAgent

File: agents/synthesis.py

Core Responsibilities:

  1. Compare findings across all analyzed papers
  2. Identify consensus points (where papers agree)
  3. Identify contradictions (where papers disagree)
  4. Identify research gaps (what's missing)
  5. Generate executive summary addressing user's original query

State Transformations:

# Input Keys
papers = state.get("papers", [])        # List[Paper]
analyses = state.get("analyses", [])    # List[Analysis] from AnalyzerAgent
query = state.get("query")              # Original user question

# Output Keys (added to state)
state["synthesis"] = SynthesisResult(
    consensus_points=[ConsensusPoint(...), ...],
    contradictions=[Contradiction(...), ...],
    research_gaps=["Gap 1", "Gap 2", ...],
    summary="Executive summary addressing user query...",
    papers_analyzed=["arxiv_id_1", "arxiv_id_2", ...],
    confidence_score=0.85
)
state["token_usage"]["input_tokens"] += 8000
state["token_usage"]["output_tokens"] += 2000

Dependencies:

def __init__(
    self,
    rag_retriever,       # RAGRetriever (passed but not actively used)
    azure_openai_config: Dict[str, str],
    timeout: int = 90    # Longer timeout for synthesis (more complex task)
):

Key Design Pattern: Cross-Paper Synthesis Prompt

# agents/synthesis.py:54-133
def _create_synthesis_prompt(
    self,
    query: str,
    papers: List[Paper],
    analyses: List[Analysis]
) -> str:
    """
    Create structured prompt for cross-paper synthesis.
    """
    # Format all analyses into structured summaries
    paper_summaries = []
    for paper, analysis in zip(papers, analyses):
        summary = f"""
[Paper {paper.arxiv_id}]
Title: {paper.title}
Authors: {', '.join(paper.authors[:3])}...
Published: {paper.published}

Methodology: {' | '.join(analysis.methodology[:3])}
Key Findings: {' | '.join(analysis.key_findings[:3])}
Conclusions: {' | '.join(analysis.conclusions[:2])}
Limitations: {' | '.join(analysis.limitations[:2])}
Contributions: {' | '.join(analysis.contributions[:2])}
"""
        paper_summaries.append(summary)

    # Synthesis prompt
    prompt = f"""
You are synthesizing findings from {len(papers)} research papers to answer this question:
"{query}"

# Paper Summaries
{chr(10).join(paper_summaries)}

# Task
Analyze the papers above and provide:

1. **Consensus Points**: What do multiple papers agree on?
   - For each consensus point, list supporting papers (use arxiv_id)
   - Provide evidence from the papers

2. **Contradictions**: Where do papers disagree or present conflicting findings?
   - Describe the contradiction clearly
   - List papers on each side (papers_a, papers_b)

3. **Research Gaps**: What questions remain unanswered? What future directions are suggested?

4. **Summary**: A concise executive summary (2-3 paragraphs) answering the user's original question

Return as JSON:
{{
    "consensus_points": [
        {{
            "point": "Description of consensus",
            "supporting_papers": ["arxiv_id_1", "arxiv_id_2"],
            "evidence": "Evidence from papers"
        }}
    ],
    "contradictions": [
        {{
            "description": "Description of contradiction",
            "papers_a": ["arxiv_id_1"],
            "papers_b": ["arxiv_id_2"],
            "context": "Additional context"
        }}
    ],
    "research_gaps": ["Gap 1", "Gap 2", ...],
    "summary": "Executive summary here...",
    "confidence_score": 0.85
}}
"""
    return prompt

Why This Pattern?

  • Structured Input: LLM receives formatted summaries for all papers
  • Explicit Citations: Requires grounding claims in specific papers
  • JSON Schema: Forces structured output for Pydantic validation
  • Comprehensive Analysis: Covers consensus, contradictions, gaps, and summary

Key Design Pattern: Nested Data Normalization

# agents/synthesis.py:135-196
def _normalize_synthesis_response(self, data: dict) -> dict:
    """
    Normalize nested structures in synthesis response.

    Handles:
    - consensus_points[].supporting_papers (list)
    - consensus_points[].citations (list)
    - contradictions[].papers_a (list)
    - contradictions[].papers_b (list)
    - research_gaps (list)
    """
    def ensure_list_of_strings(value):
        if value is None:
            return []
        if isinstance(value, str):
            return [value]
        if isinstance(value, list):
            return [str(item) for item in value if item]
        return [str(value)]

    normalized = {
        "consensus_points": [],
        "contradictions": [],
        "research_gaps": ensure_list_of_strings(data.get("research_gaps", [])),
        "summary": str(data.get("summary", "")),
        "confidence_score": float(data.get("confidence_score", 0.0))
    }

    # Normalize consensus points
    for cp in data.get("consensus_points", []):
        normalized["consensus_points"].append({
            "point": str(cp.get("point", "")),
            "supporting_papers": ensure_list_of_strings(cp.get("supporting_papers", [])),
            "evidence": str(cp.get("evidence", "")),
            "citations": ensure_list_of_strings(cp.get("citations", []))
        })

    # Normalize contradictions
    for contr in data.get("contradictions", []):
        normalized["contradictions"].append({
            "description": str(contr.get("description", "")),
            "papers_a": ensure_list_of_strings(contr.get("papers_a", [])),
            "papers_b": ensure_list_of_strings(contr.get("papers_b", [])),
            "context": str(contr.get("context", "")),
            "citations": ensure_list_of_strings(contr.get("citations", []))
        })

    return normalized

Why This Pattern?

  • Nested Schema Complexity: ConsensusPoint and Contradiction have nested lists
  • LLM Inconsistency: May return strings instead of lists for single items
  • Defensive Parsing: Ensures Pydantic validation succeeds

Error Handling Strategy:

# agents/synthesis.py:242-310
@observe(name="synthesis_agent_run", as_type="generation")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        papers = state.get("papers", [])
        analyses = state.get("analyses", [])
        query = state.get("query", "")

        # Handle paper count mismatch (defensive)
        if len(papers) != len(analyses):
            logger.warning(f"Paper count mismatch: {len(papers)} papers, {len(analyses)} analyses")
            # Use minimum length to avoid index errors
            min_len = min(len(papers), len(analyses))
            papers = papers[:min_len]
            analyses = analyses[:min_len]

        # Create synthesis prompt
        prompt = self._create_synthesis_prompt(query, papers, analyses)

        # Call LLM
        response = self.client.chat.completions.create(
            model=self.deployment_name,
            messages=[
                {"role": "system", "content": "You are a research synthesis expert..."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.0,
            response_format={"type": "json_object"},
            max_tokens=3000,
            timeout=self.timeout
        )

        # Parse and normalize
        data = json.loads(response.choices[0].message.content)
        normalized = self._normalize_synthesis_response(data)

        # Add paper IDs
        normalized["papers_analyzed"] = [p.arxiv_id for p in papers]

        # Create Pydantic model
        synthesis = SynthesisResult(**normalized)

        # Update state
        state["synthesis"] = synthesis
        state["token_usage"]["input_tokens"] += response.usage.prompt_tokens
        state["token_usage"]["output_tokens"] += response.usage.completion_tokens

        return state

    except Exception as e:
        logger.error(f"Synthesis failed: {str(e)}", exc_info=True)

        # Return minimal synthesis with confidence=0.0
        papers_analyzed = [p.arxiv_id for p in state.get("papers", [])]
        state["synthesis"] = SynthesisResult(
            consensus_points=[],
            contradictions=[],
            research_gaps=[],
            summary=f"Synthesis failed: {str(e)}",
            papers_analyzed=papers_analyzed,
            confidence_score=0.0
        )
        state["errors"].append(f"Synthesis failed: {str(e)}")
        return state

Observability Integration:

@observe(name="synthesis_agent_run", as_type="generation")
  • Type: "generation" (single LLM call for cross-paper analysis)
  • Trace Data: Paper count, synthesis complexity, token usage
  • LangFuse View: Shows synthesis LLM call with full prompt/completion

Critical File Paths:

  • agents/synthesis.py:54-133 - Synthesis prompt creation
  • agents/synthesis.py:135-196 - Nested data normalization
  • agents/synthesis.py:242-310 - Main run() method with error handling

CitationAgent

File: agents/citation.py

Core Responsibilities:

  1. Generate APA-formatted citations for all papers
  2. Validate synthesis claims against source papers
  3. Calculate cost estimates using dynamic pricing configuration
  4. Create final ValidatedOutput with all metadata

State Transformations:

# Input Keys
synthesis = state.get("synthesis")      # SynthesisResult from SynthesisAgent
papers = state.get("papers", [])        # List[Paper]
token_usage = state.get("token_usage", {})
model_desc = state.get("model_desc", {})

# Output Keys (added to state)
state["validated_output"] = ValidatedOutput(
    synthesis=synthesis,
    citations=[Citation(...), ...],
    retrieved_chunks=[chunk_id_1, chunk_id_2, ...],
    token_usage=token_usage,
    cost_estimate=0.0234,  # USD
    processing_time=12.5   # seconds
)

Dependencies:

def __init__(
    self,
    rag_retriever  # RAGRetriever (injected but not actively used)
):

Key Design Pattern: APA Citation Formatting

# agents/citation.py:31-61
def _format_apa_citation(self, paper: Paper) -> str:
    """
    Format paper in APA style.

    Format: Authors. (Year). Title. arXiv:ID. URL
    """
    # Handle different author counts
    if len(paper.authors) == 1:
        author_str = paper.authors[0]
    elif len(paper.authors) == 2:
        author_str = f"{paper.authors[0]} & {paper.authors[1]}"
    else:
        # 3+ authors: List all with ampersand before last
        author_str = ", ".join(paper.authors[:-1]) + f", & {paper.authors[-1]}"

    # Extract year from published date (format: "2024-01-15T10:30:00Z")
    year = paper.published.split("-")[0] if paper.published else "n.d."

    # Format citation
    citation = (
        f"{author_str}. ({year}). {paper.title}. "
        f"arXiv:{paper.arxiv_id}. {paper.arxiv_url}"
    )

    return citation

Why This Pattern?

  • Academic Standard: APA is widely recognized format
  • Consistent Formatting: Handles 1, 2, or many authors uniformly
  • Traceability: Includes arXiv ID and URL for easy lookup

Key Design Pattern: Synthesis Validation

# agents/citation.py:90-134
def validate_synthesis(
    self,
    synthesis: SynthesisResult,
    papers: List[Paper]
) -> Dict[str, Any]:
    """
    Validate synthesis claims against source papers.

    Returns:
    - total_consensus_points: int
    - total_contradictions: int
    - referenced_papers: List[str] (arxiv IDs mentioned)
    - chunk_ids: List[str] (chunks used for grounding)
    """
    validation_data = {
        "total_consensus_points": len(synthesis.consensus_points),
        "total_contradictions": len(synthesis.contradictions),
        "referenced_papers": [],
        "chunk_ids": []
    }

    # Collect all referenced paper IDs
    for cp in synthesis.consensus_points:
        validation_data["referenced_papers"].extend(cp.supporting_papers)
        validation_data["chunk_ids"].extend(cp.citations)

    for contr in synthesis.contradictions:
        validation_data["referenced_papers"].extend(contr.papers_a)
        validation_data["referenced_papers"].extend(contr.papers_b)
        validation_data["chunk_ids"].extend(contr.citations)

    # Deduplicate
    validation_data["referenced_papers"] = list(set(validation_data["referenced_papers"]))
    validation_data["chunk_ids"] = list(set(validation_data["chunk_ids"]))

    logger.info(
        f"Validation: {validation_data['total_consensus_points']} consensus points, "
        f"{validation_data['total_contradictions']} contradictions, "
        f"{len(validation_data['referenced_papers'])} papers referenced"
    )

    return validation_data

Why This Pattern?

  • Traceability: Tracks which papers are actually cited
  • Metadata Extraction: Chunk IDs for provenance tracking
  • Quality Signal: High citation count indicates well-grounded synthesis

Key Design Pattern: Dynamic Cost Calculation

# agents/citation.py:164-183
def calculate_cost(
    self,
    token_usage: Dict[str, int],
    model_desc: Dict[str, str]
) -> float:
    """
    Calculate cost estimate using dynamic pricing from config.
    """
    from utils.config import get_pricing_config

    pricing_config = get_pricing_config()

    # Get model-specific pricing
    llm_model = model_desc.get("llm_model", "gpt-4o-mini")
    embedding_model = model_desc.get("embedding_model", "text-embedding-3-small")

    llm_pricing = pricing_config.get_model_pricing(llm_model)
    embedding_pricing = pricing_config.get_embedding_pricing(embedding_model)

    # Calculate costs (pricing is per 1M tokens)
    input_cost = (token_usage.get("input_tokens", 0) / 1_000_000) * llm_pricing["input"]
    output_cost = (token_usage.get("output_tokens", 0) / 1_000_000) * llm_pricing["output"]
    embedding_cost = (token_usage.get("embedding_tokens", 0) / 1_000_000) * embedding_pricing

    total_cost = input_cost + output_cost + embedding_cost
    return round(total_cost, 4)

Why This Pattern?

  • Centralized Pricing: Single source of truth in utils/config.py
  • Model Flexibility: Supports any Azure OpenAI model (falls back to defaults)
  • Transparency: Breaks down cost by operation type

Error Handling Strategy:

# agents/citation.py:200-254
@observe(name="citation_agent_run", as_type="span")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    """
    Note: Citation agent rarely fails (pure data transformation).
    No complex error handling needed.
    """
    try:
        synthesis = state.get("synthesis")
        papers = state.get("papers", [])
        token_usage = state.get("token_usage", {})
        model_desc = state.get("model_desc", {})
        start_time = state.get("start_time", time.time())

        # Generate citations
        citations = []
        for paper in papers:
            citation_text = self._format_apa_citation(paper)
            citations.append(Citation(
                arxiv_id=paper.arxiv_id,
                citation_text=citation_text
            ))

        # Validate synthesis
        validation_data = self.validate_synthesis(synthesis, papers)

        # Calculate cost and timing
        cost_estimate = self.calculate_cost(token_usage, model_desc)
        processing_time = time.time() - start_time

        # Create final output
        validated_output = ValidatedOutput(
            synthesis=synthesis,
            citations=citations,
            retrieved_chunks=validation_data["chunk_ids"],
            token_usage=token_usage,
            cost_estimate=cost_estimate,
            processing_time=round(processing_time, 2)
        )

        state["validated_output"] = validated_output
        logger.info(
            f"Citation agent completed. Cost: ${cost_estimate:.4f}, "
            f"Time: {processing_time:.2f}s"
        )

        return state

    except Exception as e:
        logger.error(f"Citation agent failed: {str(e)}", exc_info=True)
        state["errors"].append(f"Citation failed: {str(e)}")
        return state

Observability Integration:

@observe(name="citation_agent_run", as_type="span")
  • Type: "span" (data processing only, no LLM calls)
  • Trace Data: Citation count, cost estimate, processing time
  • LangFuse View: Shows data transformation duration

Critical File Paths:

  • agents/citation.py:31-61 - APA citation formatting
  • agents/citation.py:90-134 - Synthesis validation
  • agents/citation.py:164-183 - Dynamic cost calculation
  • agents/citation.py:200-254 - Main run() method

4. Cross-Cutting Patterns

State Management

AgentState TypedDict

All workflow state is managed through a strongly-typed dictionary defined in utils/langgraph_state.py:

from typing import TypedDict, List, Dict, Optional, Any
from utils.schemas import Paper, PaperChunk, Analysis, SynthesisResult, ValidatedOutput

class AgentState(TypedDict, total=False):
    # Input fields (from user)
    query: str
    category: Optional[str]
    num_papers: int

    # Agent outputs
    papers: List[Paper]
    chunks: List[PaperChunk]
    analyses: List[Analysis]
    filtered_analyses: List[Analysis]  # After filter node
    synthesis: SynthesisResult
    validated_output: ValidatedOutput

    # Metadata
    errors: List[str]
    token_usage: Dict[str, int]  # {input_tokens, output_tokens, embedding_tokens}
    start_time: float
    processing_time: float
    model_desc: Dict[str, str]  # {llm_model, embedding_model}

    # Tracing
    trace_id: Optional[str]
    session_id: Optional[str]
    user_id: Optional[str]

Key Benefits:

  • Type Safety: IDEs provide autocomplete and type checking
  • Documentation: State shape is self-documenting
  • Validation: LangGraph validates state structure at runtime

Serialization Requirements (msgpack)

CRITICAL: LangGraph uses msgpack for state checkpointing. Only these types are allowed in state:

βœ… Allowed:

# Primitives
state["query"] = "transformer architectures"  # str
state["num_papers"] = 5                       # int
state["processing_time"] = 12.5               # float
state["enabled"] = True                       # bool
state["optional_field"] = None                # None

# Collections
state["errors"] = ["Error 1", "Error 2"]      # list
state["token_usage"] = {"input": 1000}        # dict

# Pydantic models (via .model_dump())
state["papers"] = [paper.model_dump() for paper in papers]  # WRONG
state["papers"] = papers  # CORRECT (LangGraph serializes automatically)

❌ Prohibited:

# Complex objects
state["progress"] = gr.Progress()        # ❌ Gradio components
state["file"] = open("data.txt")         # ❌ File handles
state["thread"] = threading.Thread()     # ❌ Thread objects
state["callback"] = lambda x: x          # ❌ Functions/callbacks

Real Bug Example (from BUGFIX_MSGPACK_SERIALIZATION.md):

# BEFORE (broken)
def run_workflow(workflow_app, initial_state, config, progress):
    initial_state["progress"] = progress  # ❌ Non-serializable
    final_state = workflow_app.invoke(initial_state, config)
    # CRASH: TypeError: can't serialize gr.Progress

# AFTER (fixed)
def run_workflow(workflow_app, initial_state, config, progress):
    # Keep progress as local variable, NOT in state
    for event in workflow_app.stream(initial_state, config):
        # Update progress using local variable
        if progress:
            progress(0.5, desc="Processing...")
    return final_state

Token Usage Tracking Pattern

All agents update the shared token_usage dictionary:

# Initialize in create_initial_state() (utils/langgraph_state.py:46-91)
initial_state["token_usage"] = {
    "input_tokens": 0,
    "output_tokens": 0,
    "embedding_tokens": 0
}

# RetrieverAgent updates embedding tokens
state["token_usage"]["embedding_tokens"] = len(chunks) * 300  # Estimate

# AnalyzerAgent updates LLM tokens (thread-safe)
with self.token_lock:
    self.total_input_tokens += response.usage.prompt_tokens
    self.total_output_tokens += response.usage.completion_tokens

# After all analyses
state["token_usage"]["input_tokens"] = self.total_input_tokens
state["token_usage"]["output_tokens"] = self.total_output_tokens

# SynthesisAgent accumulates (+=, not =)
state["token_usage"]["input_tokens"] += response.usage.prompt_tokens
state["token_usage"]["output_tokens"] += response.usage.completion_tokens

# CitationAgent reads final totals
cost_estimate = self.calculate_cost(state["token_usage"], model_desc)

Why This Pattern?

  • Centralized Tracking: Single source of truth for token usage
  • Cost Transparency: Users see exact token consumption
  • Performance Monitoring: Track token usage trends over time

Error Handling Philosophy

The Golden Rule: Never Raise from run()

All agents follow this contract:

def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    try:
        # Agent logic here
        return state
    except Exception as e:
        logger.error(f"Agent failed: {str(e)}", exc_info=True)
        state["errors"].append(f"Agent failed: {str(e)}")
        return state  # NEVER raise

Why?

  • Workflow Resilience: One agent's failure doesn't crash entire pipeline
  • Partial Results: Downstream agents can work with available data
  • Debugging: Errors collected in state for tracing

Error Handling Strategies by Agent

RetrieverAgent: Fallback + Partial Success

# Two-tier fallback for search
papers = self._search_with_fallback(query, max_results, category)
if not papers:
    state["errors"].append("No papers found")
    return state  # Early return, not exception

# Continue with partial results on download failures
for paper in papers:
    try:
        pdf_path = self._download_with_fallback(paper)
    except Exception as e:
        logger.warning(f"Skipping {paper.arxiv_id}: {str(e)}")
        continue  # Process other papers

AnalyzerAgent: Circuit Breaker + Minimal Analysis

# Circuit breaker: Stop after 2 consecutive failures
if self.consecutive_failures >= self.max_consecutive_failures:
    logger.error("Circuit breaker triggered")
    break

# On failure, return minimal analysis with confidence=0.0
except Exception as e:
    return Analysis(
        arxiv_id=paper.arxiv_id,
        title=paper.title,
        methodology=[], key_findings=[], conclusions=[],
        limitations=[], contributions=[],
        confidence_score=0.0  # Signal failure
    )

SynthesisAgent: Paper Count Mismatch Handling

# Defensive: Handle mismatched paper/analysis counts
if len(papers) != len(analyses):
    logger.warning(f"Count mismatch: {len(papers)} papers, {len(analyses)} analyses")
    min_len = min(len(papers), len(analyses))
    papers = papers[:min_len]
    analyses = analyses[:min_len]

# On failure, return empty synthesis with confidence=0.0
except Exception as e:
    state["synthesis"] = SynthesisResult(
        consensus_points=[], contradictions=[], research_gaps=[],
        summary=f"Synthesis failed: {str(e)}",
        papers_analyzed=[p.arxiv_id for p in papers],
        confidence_score=0.0
    )

CitationAgent: Rare Failures (Data Transformation Only)

# Simpler error handling (no LLM, no external APIs)
try:
    # Pure data transformation
    citations = [self._format_apa_citation(p) for p in papers]
    cost = self.calculate_cost(token_usage, model_desc)
    return state
except Exception as e:
    logger.error(f"Citation failed: {str(e)}")
    state["errors"].append(f"Citation failed: {str(e)}")
    return state

Confidence Score as Quality Signal

All agents that can fail use confidence_score to indicate quality:

# High confidence: Successful analysis with good context
Analysis(confidence_score=0.85, ...)

# Low confidence: Successful but limited context
Analysis(confidence_score=0.45, ...)

# Zero confidence: Failure (filter node removes these)
Analysis(confidence_score=0.0, ...)

Filter Node uses confidence scores to remove bad analyses:

# orchestration/nodes.py:74-107
@observe(name="filter_low_confidence", as_type="span")
def filter_node(state: AgentState) -> AgentState:
    analyses = state.get("analyses", [])
    threshold = 0.7  # Configurable

    filtered = [a for a in analyses if a.confidence_score >= threshold]

    logger.info(
        f"Filtered {len(filtered)}/{len(analyses)} analyses "
        f"(threshold={threshold})"
    )

    state["filtered_analyses"] = filtered
    return state

Observability Integration

Three-Tier Tracing Architecture

Tier 1: Node-Level Tracing (orchestration layer)

# orchestration/nodes.py
@observe(name="analyzer_agent", as_type="span")
def analyzer_node(state: AgentState, analyzer_agent) -> AgentState:
    logger.info("Starting analyzer agent...")
    updated_state = analyzer_agent.run(state)
    logger.info(f"Analyzer completed. Analyses: {len(updated_state.get('analyses', []))}")
    return updated_state

What's Captured:

  • Node execution duration
  • Input/output state snapshots
  • Errors caught by node wrapper

Tier 2: Agent-Level Tracing (agent logic)

# agents/analyzer.py
@observe(name="analyzer_agent_run", as_type="generation")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    # Agent logic...
    return state

What's Captured:

  • Agent execution duration
  • State transformations
  • Agent-specific metadata (paper count, analysis count)

Tier 3: LLM-Level Tracing (automatic instrumentation)

# utils/langfuse_client.py:74-94
from langfuse.openai import openai

def instrument_openai():
    """
    Instrument Azure OpenAI client for automatic tracing.
    All chat.completions.create() calls are automatically traced.
    """
    langfuse_client = get_langfuse_client()
    if langfuse_client:
        openai.langfuse_auth(langfuse_client)

What's Captured:

  • Full prompt (system + user messages)
  • Full completion (response text)
  • Token usage (prompt_tokens, completion_tokens)
  • Model metadata (model, temperature, max_tokens)
  • Latency (time to first token, total time)
  • Cost (calculated from token usage)

@observe Decorator Patterns

Generation Type (for LLM-heavy agents):

@observe(name="analyzer_agent_run", as_type="generation")
def run(self, state):
    # Marks this as an LLM generation task
    # LangFuse shows token usage, cost, latency
    pass

Span Type (for data processing):

@observe(name="filter_low_confidence", as_type="span")
def filter_node(state):
    # Marks this as a processing step
    # LangFuse shows duration, input/output
    pass

Nested Tracing (automatic):

retriever_node()  # Creates span "retriever_agent"
  └─ retriever_agent.run()  # Creates generation "retriever_agent_run"
      └─ embedding_generator.generate_batch()  # Creates generation "embeddings"
          └─ Azure OpenAI API call  # Automatic instrumentation

Session and Trace ID Tracking

# app.py:421-434
import uuid

# Generate unique session ID per workflow execution
session_id = f"session-{uuid.uuid4().hex[:8]}"

initial_state = create_initial_state(
    query=query,
    category=category,
    num_papers=num_papers,
    model_desc=model_desc,
    start_time=start_time,
    session_id=session_id,
    user_id=None  # Optional: for multi-user tracking
)

Use Cases:

  • Session Grouping: Group all traces from single workflow execution
  • User Tracking: Analyze behavior across multiple sessions
  • Debugging: Find all traces for failed session

Graceful Degradation When LangFuse Unavailable

# utils/langfuse_client.py:97-138
def observe(name: str = None, as_type: str = "span", **kwargs):
    """
    Wrapper for @observe decorator with graceful degradation.

    If LangFuse not configured, returns identity decorator.
    """
    langfuse_client = get_langfuse_client()

    if langfuse_client is None:
        # Return no-op decorator
        def identity_decorator(func):
            return func
        return identity_decorator

    # Return actual LangFuse decorator
    from langfuse.decorators import observe as langfuse_observe
    return langfuse_observe(name=name, as_type=as_type, **kwargs)

Why This Pattern?

  • Optional Observability: App works without LangFuse configured
  • No Import Errors: Doesn't fail if langfuse package missing
  • Zero Code Changes: Same decorator usage regardless of config

Performance Optimizations

Parallel Processing in AnalyzerAgent

# agents/analyzer.py:333-359
from concurrent.futures import ThreadPoolExecutor, as_completed

def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    papers = state.get("papers", [])
    analyses = []

    # Parallel analysis with ThreadPoolExecutor
    with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
        # Submit all papers for analysis
        future_to_paper = {
            executor.submit(self.analyze_paper, paper): paper
            for paper in papers
        }

        # Collect results as they complete
        for future in as_completed(future_to_paper):
            paper = future_to_paper[future]
            try:
                analysis = future.result()
                analyses.append(analysis)
            except Exception as e:
                logger.error(f"Failed to analyze {paper.arxiv_id}: {str(e)}")

Performance Impact:

  • Serial: 5 papers Γ— 60s = 300s (5 minutes)
  • Parallel (4 workers): ~75s (80% reduction)

Thread Safety:

# agents/analyzer.py:48-51
import threading

def __init__(self, ...):
    self.token_lock = threading.Lock()
    self.total_input_tokens = 0
    self.total_output_tokens = 0

# In analyze_paper() method
with self.token_lock:
    self.total_input_tokens += response.usage.prompt_tokens
    self.total_output_tokens += response.usage.completion_tokens

Circuit Breaker Pattern

# agents/analyzer.py:54-57
def __init__(self, ...):
    self.consecutive_failures = 0
    self.max_consecutive_failures = 2

# In run() method
for future in as_completed(future_to_paper):
    # Check circuit breaker BEFORE processing next result
    if self.consecutive_failures >= self.max_consecutive_failures:
        logger.error(f"Circuit breaker triggered after {self.consecutive_failures} failures")
        break

    # Process result
    if analysis.confidence_score > 0:
        self.consecutive_failures = 0  # Reset on success
    else:
        self.consecutive_failures += 1

Why Circuit Breaker?

  • Fail Fast: Stops after 2 failures instead of wasting 3 more LLM calls
  • Cost Savings: Prevents runaway API usage on systemic failures
  • User Experience: Faster failure feedback

Batch Operations

Embedding Generation (RetrieverAgent):

# rag/embeddings.py
def generate_batch(self, chunks: List[PaperChunk]) -> List[List[float]]:
    """
    Generate embeddings for multiple chunks in a single API call.

    Azure OpenAI supports batch size up to 2048 inputs.
    """
    texts = [chunk.content for chunk in chunks]

    # Single API call for all chunks
    response = self.client.embeddings.create(
        model=self.deployment_name,
        input=texts  # List of strings
    )

    return [item.embedding for item in response.data]

Performance Impact:

  • Serial: 100 chunks Γ— 50ms = 5000ms
  • Batch: 1 call Γ— 200ms = 200ms (96% reduction)

Timeout Configuration

Different agents have different timeout needs:

# AnalyzerAgent (60s) - moderate timeout
self.client.chat.completions.create(
    timeout=60  # Analyzing single paper
)

# SynthesisAgent (90s) - longer timeout
self.client.chat.completions.create(
    timeout=90  # Cross-paper synthesis more complex
)

Why Different Timeouts?

  • Synthesis is slower: Processes all papers simultaneously, larger context
  • Prevents premature failures: Allows complex reasoning to complete
  • Still bounded: Avoids infinite hangs

5. Workflow Orchestration

LangGraph Workflow Structure

The workflow is defined in orchestration/workflow_graph.py:

from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
from utils.langgraph_state import AgentState

def create_workflow_graph(
    retriever_agent,
    analyzer_agent,
    synthesis_agent,
    citation_agent,
    use_checkpointing: bool = True
) -> Any:
    """
    Create LangGraph workflow with all agents and conditional routing.
    """
    # Create state graph
    workflow = StateGraph(AgentState)

    # Add nodes (lambda binds agent instances)
    workflow.add_node("retriever", lambda state: retriever_node(state, retriever_agent))
    workflow.add_node("analyzer", lambda state: analyzer_node(state, analyzer_agent))
    workflow.add_node("filter", filter_node)
    workflow.add_node("synthesis", lambda state: synthesis_node(state, synthesis_agent))
    workflow.add_node("citation", lambda state: citation_node(state, citation_agent))
    workflow.add_node("finalize", finalize_node)

    # Set entry point
    workflow.set_entry_point("retriever")

    # Add conditional edges
    workflow.add_conditional_edges(
        "retriever",
        should_continue_after_retriever,
        {
            "continue": "analyzer",
            "end": END
        }
    )

    workflow.add_conditional_edges(
        "filter",
        should_continue_after_filter,
        {
            "continue": "synthesis",
            "end": END
        }
    )

    # Add standard edges
    workflow.add_edge("analyzer", "filter")
    workflow.add_edge("synthesis", "citation")
    workflow.add_edge("citation", "finalize")
    workflow.add_edge("finalize", END)

    # Compile with checkpointing
    if use_checkpointing:
        checkpointer = MemorySaver()
        return workflow.compile(checkpointer=checkpointer)
    else:
        return workflow.compile()

Complete Workflow Flow:

START
  ↓
retriever
  ↓
[Check: papers found?]
  β”œβ”€ No β†’ END
  └─ Yes β†’ analyzer
            ↓
          filter
            ↓
          [Check: valid analyses?]
            β”œβ”€ No β†’ END
            └─ Yes β†’ synthesis
                      ↓
                    citation
                      ↓
                    finalize
                      ↓
                    END

Node Wrapper Pattern

Purpose of Node Wrappers:

  1. Orchestration Concerns: Tracing, logging, error handling
  2. Agent Logic Isolation: Keeps agents pure and testable
  3. Consistent Interface: All nodes follow same pattern

Standard Node Wrapper Template:

from langfuse.decorators import observe
from utils.langgraph_state import AgentState
import logging

logger = logging.getLogger(__name__)

@observe(name="<agent_name>", as_type="<span|generation>")
def <agent>_node(state: AgentState, agent_instance) -> AgentState:
    """
    Node wrapper for <AgentName>.

    Responsibilities:
    - LangFuse tracing (via @observe)
    - Structured logging
    - Error handling
    - State transformation delegation
    """
    logger.info("Starting <agent_name> agent...")

    try:
        # Delegate to agent's run() method
        updated_state = agent_instance.run(state)

        # Log completion with metrics
        logger.info(f"<Agent> completed. <Metric>: {len(updated_state.get('<key>', []))}")
        return updated_state

    except Exception as e:
        # Catch-all error handling
        logger.error(f"<Agent> node failed: {str(e)}", exc_info=True)
        state["errors"].append(f"<Agent> failed: {str(e)}")
        return state  # Return original state on failure

Example: FilterNode (standalone logic, no agent instance)

# orchestration/nodes.py:74-107
@observe(name="filter_low_confidence", as_type="span")
def filter_node(state: AgentState) -> AgentState:
    """
    Filter out low-confidence analyses.

    Note: This is NOT an agent wrapper - it's standalone logic.
    """
    analyses = state.get("analyses", [])
    threshold = 0.7

    # Filter logic
    filtered = [a for a in analyses if a.confidence_score >= threshold]

    logger.info(
        f"Filtered {len(filtered)}/{len(analyses)} analyses "
        f"(threshold={threshold})"
    )

    state["filtered_analyses"] = filtered
    return state

Conditional Routing

Two Routing Decision Points:

1. After Retriever: Check if Papers Found

# orchestration/nodes.py:168-179
def should_continue_after_retriever(state: AgentState) -> str:
    """
    Route based on paper retrieval success.

    Returns:
        "continue": Papers found, proceed to analyzer
        "end": No papers found, terminate workflow
    """
    papers = state.get("papers", [])

    if len(papers) == 0:
        logger.warning("No papers retrieved. Ending workflow.")
        return "end"

    logger.info(f"Retrieved {len(papers)} papers. Continuing to analyzer.")
    return "continue"

Why Early Termination?

  • Cost Savings: No point running LLM analysis if no papers
  • User Experience: Immediate feedback that search failed
  • Error Clarity: Clear error message vs generic "no results"

2. After Filter: Check if Valid Analyses Remain

# orchestration/nodes.py:182-193
def should_continue_after_filter(state: AgentState) -> str:
    """
    Route based on filter results.

    Returns:
        "continue": Valid analyses exist, proceed to synthesis
        "end": All analyses filtered out, terminate workflow
    """
    filtered = state.get("filtered_analyses", [])

    if len(filtered) == 0:
        logger.warning("No valid analyses after filtering. Ending workflow.")
        return "end"

    logger.info(f"{len(filtered)} valid analyses. Continuing to synthesis.")
    return "continue"

Why Filter Check?

  • Quality Gate: Prevents synthesis on all-failed analyses
  • Confidence Threshold: Only synthesizes high-quality analyses (>0.7)
  • Cost Savings: Avoids synthesis LLM call on garbage data

Checkpointing and State Persistence

MemorySaver Checkpointer:

# orchestration/workflow_graph.py:120-126
from langgraph.checkpoint.memory import MemorySaver

if use_checkpointing:
    checkpointer = MemorySaver()
    return workflow.compile(checkpointer=checkpointer)

What Gets Checkpointed:

  • State after each node: Full AgentState dictionary
  • Serialized to msgpack: Efficient binary format
  • Stored in memory: Checkpointer holds state history

Use Cases:

1. Workflow Resumption:

# Get state at specific point
thread_id = "thread-abc123"
state = workflow.get_state(thread_id, checkpoint_id="checkpoint-5")

# Resume from that state
final_state = workflow.invoke(state, config={"thread_id": thread_id})

2. Debugging:

# Inspect state after analyzer node
state_after_analyzer = workflow.get_state(thread_id, checkpoint_id="after-analyzer")
print(f"Analyses: {state_after_analyzer['analyses']}")

3. Time Travel (Replay):

# Re-run from specific checkpoint with different parameters
state["num_papers"] = 10  # Change parameter
workflow.invoke(state, config={"thread_id": thread_id})

Configuration:

# app.py:464-470
config = {
    "configurable": {
        "thread_id": session_id  # Unique per execution
    }
}

final_state = run_workflow(workflow_app, initial_state, config, progress)

Why Checkpointing?

  • Resilience: Can resume on crashes
  • Debugging: Inspect intermediate state
  • Experimentation: Replay from checkpoints with different configs

6. Building New Agents

Step-by-Step Development Guide

Follow this workflow to create a new agent that integrates seamlessly with the system:

Step 1: Define Agent Responsibilities

Questions to Answer:

  • What specific task does this agent perform?
  • What are its inputs (which state keys)?
  • What are its outputs (which state keys added/modified)?
  • Does it call external APIs or LLMs?
  • Can it fail? How should it degrade gracefully?

Example: SummarizerAgent

  • Task: Generate concise summaries for each paper
  • Inputs: papers, chunks
  • Outputs: summaries (List[PaperSummary])
  • External Calls: Azure OpenAI (LLM)
  • Failure Mode: Return empty summary with confidence=0.0

Step 2: Create Pydantic Schemas

Add output schemas to utils/schemas.py:

# utils/schemas.py
from pydantic import BaseModel, Field
from typing import List

class PaperSummary(BaseModel):
    """Summary of a single paper."""
    arxiv_id: str = Field(..., description="arXiv ID of the paper")
    title: str = Field(..., description="Paper title")
    summary: str = Field(..., description="3-4 sentence summary")
    key_points: List[str] = Field(default_factory=list, description="Bullet points")
    confidence_score: float = Field(..., ge=0.0, le=1.0, description="Summary quality")

Step 3: Implement Agent Class

Create agents/summarizer.py:

from typing import Dict, Any, List
import logging
import json
from openai import AzureOpenAI
from utils.schemas import Paper, PaperChunk, PaperSummary
from langfuse.decorators import observe

logger = logging.getLogger(__name__)

class SummarizerAgent:
    """Generates concise summaries for each paper."""

    def __init__(
        self,
        azure_openai_config: Dict[str, str],
        max_summary_tokens: int = 500,
        timeout: int = 30
    ):
        """
        Initialize SummarizerAgent.

        Args:
            azure_openai_config: Azure OpenAI credentials
            max_summary_tokens: Max tokens for summary generation
            timeout: LLM call timeout in seconds
        """
        self.deployment_name = azure_openai_config["deployment_name"]
        self.max_summary_tokens = max_summary_tokens
        self.timeout = timeout

        # Initialize Azure OpenAI client
        self.client = AzureOpenAI(
            api_key=azure_openai_config["api_key"],
            api_version=azure_openai_config.get("api_version", "2024-02-01"),
            azure_endpoint=azure_openai_config["endpoint"]
        )

    def _create_summary_prompt(self, paper: Paper, chunks: List[PaperChunk]) -> str:
        """Create prompt for summarization."""
        # Get abstract and introduction chunks
        relevant_chunks = [
            c for c in chunks
            if c.paper_id == paper.arxiv_id and c.section in ["abstract", "introduction"]
        ][:5]  # First 5 chunks

        context = "\n\n".join([c.content for c in relevant_chunks])

        prompt = f"""
Summarize this research paper concisely.

Title: {paper.title}
Authors: {', '.join(paper.authors[:3])}

Paper Content:
{context}

Provide:
1. A 3-4 sentence summary
2. 3-5 key points (bullet list)

Return as JSON:
{{
    "summary": "3-4 sentence summary here...",
    "key_points": ["Point 1", "Point 2", ...],
    "confidence_score": 0.85
}}
"""
        return prompt

    def _normalize_summary_response(self, data: dict, paper: Paper) -> dict:
        """Normalize LLM response to match Pydantic schema."""
        def ensure_string(value):
            return str(value) if value else ""

        def ensure_list_of_strings(value):
            if isinstance(value, list):
                return [str(item) for item in value if item]
            return [str(value)] if value else []

        return {
            "arxiv_id": paper.arxiv_id,
            "title": paper.title,
            "summary": ensure_string(data.get("summary", "")),
            "key_points": ensure_list_of_strings(data.get("key_points", [])),
            "confidence_score": float(data.get("confidence_score", 0.0))
        }

    def summarize_paper(self, paper: Paper, chunks: List[PaperChunk]) -> PaperSummary:
        """
        Summarize a single paper.

        Args:
            paper: Paper metadata
            chunks: All chunks (filtered to this paper in method)

        Returns:
            PaperSummary with summary, key points, confidence
        """
        try:
            # Create prompt
            prompt = self._create_summary_prompt(paper, chunks)

            # Call LLM
            response = self.client.chat.completions.create(
                model=self.deployment_name,
                messages=[
                    {"role": "system", "content": "You are a research paper summarizer."},
                    {"role": "user", "content": prompt}
                ],
                temperature=0.0,  # Deterministic
                response_format={"type": "json_object"},
                max_tokens=self.max_summary_tokens,
                timeout=self.timeout
            )

            # Parse and normalize
            data = json.loads(response.choices[0].message.content)
            normalized = self._normalize_summary_response(data, paper)

            # Create Pydantic model
            summary = PaperSummary(**normalized)

            logger.info(f"Summarized {paper.arxiv_id} (confidence={summary.confidence_score:.2f})")
            return summary

        except Exception as e:
            logger.error(f"Failed to summarize {paper.arxiv_id}: {str(e)}", exc_info=True)

            # Return minimal summary with confidence=0.0
            return PaperSummary(
                arxiv_id=paper.arxiv_id,
                title=paper.title,
                summary="",
                key_points=[],
                confidence_score=0.0
            )

    @observe(name="summarizer_agent_run", as_type="generation")
    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Run summarizer agent on all papers.

        Args:
            state: Workflow state (requires 'papers' and 'chunks' keys)

        Returns:
            Updated state with 'summaries' key added
        """
        try:
            papers = state.get("papers", [])
            chunks = state.get("chunks", [])

            if not papers:
                logger.warning("No papers to summarize")
                state["summaries"] = []
                return state

            # Summarize each paper
            summaries = []
            for paper in papers:
                summary = self.summarize_paper(paper, chunks)
                summaries.append(summary)

            # Update state
            state["summaries"] = summaries

            logger.info(f"Summarized {len(summaries)} papers")
            return state

        except Exception as e:
            logger.error(f"Summarizer agent failed: {str(e)}", exc_info=True)
            state["errors"].append(f"Summarizer failed: {str(e)}")
            state["summaries"] = []
            return state  # Never raise

Step 4: Add Observability Decorators

Already added in Step 3:

@observe(name="summarizer_agent_run", as_type="generation")
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    # Agent logic...

Decorator Type Selection:

  • Use as_type="generation" if agent calls LLM
  • Use as_type="span" if agent only processes data
  • Decorator is automatically no-op if LangFuse not configured

Step 5: Create Node Wrapper

Add to orchestration/nodes.py:

# orchestration/nodes.py
from langfuse.decorators import observe
from utils.langgraph_state import AgentState
import logging

logger = logging.getLogger(__name__)

@observe(name="summarizer_agent", as_type="span")
def summarizer_node(state: AgentState, summarizer_agent) -> AgentState:
    """
    Node wrapper for SummarizerAgent.

    Responsibilities:
    - LangFuse tracing
    - Structured logging
    - Error handling
    """
    logger.info("Starting summarizer agent...")

    try:
        updated_state = summarizer_agent.run(state)

        summaries = updated_state.get("summaries", [])
        logger.info(f"Summarizer completed. Summaries: {len(summaries)}")

        return updated_state

    except Exception as e:
        logger.error(f"Summarizer node failed: {str(e)}", exc_info=True)
        state["errors"].append(f"Summarizer node failed: {str(e)}")
        return state

Step 6: Add to Workflow Graph

Update orchestration/workflow_graph.py:

def create_workflow_graph(
    retriever_agent,
    analyzer_agent,
    summarizer_agent,  # NEW: Add parameter
    synthesis_agent,
    citation_agent,
    use_checkpointing: bool = True
):
    workflow = StateGraph(AgentState)

    # Add nodes
    workflow.add_node("retriever", lambda state: retriever_node(state, retriever_agent))
    workflow.add_node("analyzer", lambda state: analyzer_node(state, analyzer_agent))
    workflow.add_node("summarizer", lambda state: summarizer_node(state, summarizer_agent))  # NEW
    workflow.add_node("filter", filter_node)
    workflow.add_node("synthesis", lambda state: synthesis_node(state, synthesis_agent))
    workflow.add_node("citation", lambda state: citation_node(state, citation_agent))
    workflow.add_node("finalize", finalize_node)

    # Set entry point
    workflow.set_entry_point("retriever")

    # Add edges (NEW: Insert summarizer between retriever and analyzer)
    workflow.add_edge("retriever", "summarizer")  # NEW
    workflow.add_edge("summarizer", "analyzer")   # NEW
    # workflow.add_edge("retriever", "analyzer")  # REMOVE: Old direct edge

    workflow.add_edge("analyzer", "filter")
    workflow.add_edge("filter", "synthesis")
    workflow.add_edge("synthesis", "citation")
    workflow.add_edge("citation", "finalize")
    workflow.add_edge("finalize", END)

    # Compile with checkpointing
    if use_checkpointing:
        checkpointer = MemorySaver()
        return workflow.compile(checkpointer=checkpointer)
    else:
        return workflow.compile()

Step 7: Update Conditional Routing (if needed)

If your agent can fail and should terminate the workflow:

# orchestration/nodes.py
def should_continue_after_summarizer(state: AgentState) -> str:
    """
    Route based on summarizer success.

    Returns:
        "continue": Summaries generated, proceed
        "end": All summaries failed, terminate
    """
    summaries = state.get("summaries", [])

    # Filter successful summaries (confidence > 0)
    valid_summaries = [s for s in summaries if s.confidence_score > 0]

    if len(valid_summaries) == 0:
        logger.warning("No valid summaries. Ending workflow.")
        return "end"

    logger.info(f"{len(valid_summaries)} valid summaries. Continuing.")
    return "continue"

# In workflow graph
workflow.add_conditional_edges(
    "summarizer",
    should_continue_after_summarizer,
    {
        "continue": "analyzer",
        "end": END
    }
)

Step 8: Initialize Agent in app.py

# app.py
from agents.summarizer import SummarizerAgent

class ResearchPaperAnalyzer:
    def __init__(self):
        # ... existing initialization ...

        # Initialize new agent
        self.summarizer_agent = SummarizerAgent(
            azure_openai_config=azure_config,
            max_summary_tokens=500,
            timeout=30
        )

        # Create workflow with new agent
        self.workflow_app = create_workflow_graph(
            retriever_agent=self.retriever_agent,
            analyzer_agent=self.analyzer_agent,
            summarizer_agent=self.summarizer_agent,  # NEW
            synthesis_agent=self.synthesis_agent,
            citation_agent=self.citation_agent
        )

Step 9: Update AgentState TypedDict

Add new state keys to utils/langgraph_state.py:

# utils/langgraph_state.py
from typing import TypedDict, List, Optional
from utils.schemas import Paper, PaperChunk, Analysis, PaperSummary  # NEW import

class AgentState(TypedDict, total=False):
    # ... existing fields ...

    # Agent outputs
    papers: List[Paper]
    chunks: List[PaperChunk]
    summaries: List[PaperSummary]  # NEW: Summaries from SummarizerAgent
    analyses: List[Analysis]
    filtered_analyses: List[Analysis]
    synthesis: SynthesisResult
    validated_output: ValidatedOutput

    # ... rest of fields ...

Minimal Agent Template

Use this template as a starting point for new agents:

# agents/template_agent.py
from typing import Dict, Any
import logging
from langfuse.decorators import observe

logger = logging.getLogger(__name__)

class TemplateAgent:
    """
    [Description of what this agent does]
    """

    def __init__(self, dependency1, dependency2, **kwargs):
        """
        Initialize TemplateAgent.

        Args:
            dependency1: Description
            dependency2: Description
            **kwargs: Additional configuration
        """
        self.dependency1 = dependency1
        self.dependency2 = dependency2
        # Initialize any other state

    def _helper_method(self, input_data):
        """Helper method for internal processing."""
        # Implementation...
        pass

    @observe(name="template_agent_run", as_type="span")  # or "generation" if LLM
    def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
        """
        Transform workflow state.

        Args:
            state: Current workflow state

        Returns:
            Updated state with new keys added
        """
        try:
            # 1. Read inputs from state
            input_data = state.get("input_key", [])

            if not input_data:
                logger.warning("No input data found")
                state["output_key"] = []
                return state

            # 2. Process data
            results = self._helper_method(input_data)

            # 3. Update state with outputs
            state["output_key"] = results

            # 4. Log completion
            logger.info(f"TemplateAgent completed. Results: {len(results)}")

            return state

        except Exception as e:
            # 5. Error handling: NEVER raise, always return state
            logger.error(f"TemplateAgent failed: {str(e)}", exc_info=True)
            state["errors"].append(f"TemplateAgent failed: {str(e)}")
            state["output_key"] = []  # Provide default/empty output
            return state

Node Wrapper Template:

# orchestration/nodes.py
@observe(name="template_agent", as_type="span")
def template_node(state: AgentState, template_agent) -> AgentState:
    """Node wrapper for TemplateAgent."""
    logger.info("Starting template agent...")

    try:
        updated_state = template_agent.run(state)
        logger.info(f"Template agent completed.")
        return updated_state
    except Exception as e:
        logger.error(f"Template node failed: {str(e)}", exc_info=True)
        state["errors"].append(f"Template node failed: {str(e)}")
        return state

Testing Patterns

Create tests/test_template_agent.py:

import pytest
from unittest.mock import Mock, patch
from agents.template_agent import TemplateAgent

class TestTemplateAgent:
    """Test suite for TemplateAgent."""

    @pytest.fixture
    def mock_dependency(self):
        """Mock external dependencies."""
        mock_dep = Mock()
        mock_dep.some_method.return_value = ["result1", "result2"]
        return mock_dep

    @pytest.fixture
    def agent(self, mock_dependency):
        """Create TemplateAgent instance with mocked dependencies."""
        return TemplateAgent(
            dependency1=mock_dependency,
            dependency2=Mock()
        )

    def test_run_success(self, agent):
        """Test successful agent execution."""
        # Arrange
        state = {
            "input_key": ["data1", "data2"],
            "errors": []
        }

        # Act
        result = agent.run(state)

        # Assert
        assert "output_key" in result
        assert len(result["output_key"]) > 0
        assert len(result["errors"]) == 0

    def test_run_empty_input(self, agent):
        """Test agent handles empty input gracefully."""
        # Arrange
        state = {
            "input_key": [],
            "errors": []
        }

        # Act
        result = agent.run(state)

        # Assert
        assert result["output_key"] == []
        assert len(result["errors"]) == 0

    def test_run_missing_input_key(self, agent):
        """Test agent handles missing state keys."""
        # Arrange
        state = {"errors": []}

        # Act
        result = agent.run(state)

        # Assert
        assert result["output_key"] == []
        assert len(result["errors"]) == 0

    def test_run_dependency_failure(self, agent, mock_dependency):
        """Test agent handles dependency failures gracefully."""
        # Arrange
        mock_dependency.some_method.side_effect = Exception("API error")
        state = {
            "input_key": ["data1"],
            "errors": []
        }

        # Act
        result = agent.run(state)

        # Assert
        assert result["output_key"] == []  # Empty on failure
        assert len(result["errors"]) > 0  # Error logged
        assert "TemplateAgent failed" in result["errors"][0]

    def test_state_not_mutated(self, agent):
        """Test agent doesn't mutate input state."""
        # Arrange
        original_state = {
            "input_key": ["data1"],
            "errors": []
        }
        state_copy = original_state.copy()

        # Act
        result = agent.run(state_copy)

        # Assert
        assert "output_key" not in original_state  # Original unchanged
        assert "output_key" in result  # Result has new key

Run Tests:

# Run all tests for this agent
pytest tests/test_template_agent.py -v

# Run with coverage
pytest tests/test_template_agent.py --cov=agents.template_agent -v

# Run single test
pytest tests/test_template_agent.py::TestTemplateAgent::test_run_success -v

Best Practices Checklist

Use this checklist when building or reviewing agent code:

Agent Design:

  • Agent has single, clear responsibility
  • Agent implements run(state) -> state interface
  • Dependencies injected via constructor
  • No instance state between invocations (stateless)

State Management:

  • Reads inputs using state.get(key, default)
  • Adds new keys to state (doesn't overwrite critical keys)
  • Returns modified state (doesn't mutate in-place)
  • All state values are msgpack-serializable (no Gradio components, file handles, etc.)

Error Handling:

  • Never raises exceptions from run()
  • Catches all exceptions and logs with exc_info=True
  • Appends errors to state["errors"]
  • Returns partial/degraded results on failure
  • Uses confidence scores to signal quality

Pydantic Schemas:

  • Output data modeled with Pydantic classes
  • Schema includes validation (Field with constraints)
  • Normalization method handles malformed LLM responses
  • Schema added to utils/schemas.py and imported in AgentState

LLM Configuration (if applicable):

  • Uses temperature=0.0 for deterministic outputs
  • Uses response_format={"type": "json_object"} for structured data
  • Sets appropriate timeout (60s for analysis, 90s for synthesis)
  • Sets appropriate max_tokens limit
  • Tracks token usage in state["token_usage"]

Observability:

  • run() method decorated with @observe
  • Uses as_type="generation" for LLM calls, as_type="span" for data processing
  • Structured logging with INFO/WARNING/ERROR levels
  • Logs start/completion with metrics (count, duration, etc.)

Performance:

  • Uses parallel processing if applicable (ThreadPoolExecutor)
  • Implements circuit breaker if making repeated external calls
  • Uses batch operations where possible (embeddings, database)
  • Appropriate timeout configuration

Testing:

  • Test suite in tests/test_<agent_name>.py
  • Tests cover: success, empty input, missing keys, dependency failures
  • Uses mocks for external dependencies
  • Tests verify state transformations
  • Tests verify error handling (no exceptions raised)

Integration:

  • Node wrapper created in orchestration/nodes.py
  • Agent added to workflow graph in orchestration/workflow_graph.py
  • Conditional routing added if needed
  • Agent initialized in app.py
  • AgentState TypedDict updated with new state keys

Documentation:

  • Docstrings for class and all public methods
  • Type hints for all parameters and returns
  • Comments for complex logic
  • Example added to AGENTS.md (this document)

7. Agent Comparison Reference

Quick reference table comparing all agents:

Aspect RetrieverAgent AnalyzerAgent SynthesisAgent CitationAgent
File agents/retriever.py agents/analyzer.py agents/synthesis.py agents/citation.py
Primary Task Search arXiv, download PDFs, chunk, embed Analyze individual papers with RAG Cross-paper synthesis Generate citations, validate, cost calculation
Input State Keys query, category, num_papers papers papers, analyses, query synthesis, papers, token_usage, model_desc
Output State Keys papers, chunks, token_usage[embedding_tokens] analyses, token_usage[input/output_tokens] synthesis, token_usage[input/output_tokens] validated_output
External APIs arXiv API (or MCP), Azure OpenAI (embeddings) Azure OpenAI (LLM) Azure OpenAI (LLM) None
LLM Calls No (only embeddings) Yes (one per paper) Yes (one for all papers) No
Model text-embedding-3-small gpt-4o-mini (configurable) gpt-4o-mini (configurable) N/A
Temperature N/A 0.0 0.0 N/A
Timeout 30s (download), 60s (embedding) 60s 90s N/A
Parallel Processing No Yes (ThreadPoolExecutor, 4 workers) No No
Observability Type generation (includes embeddings) generation (LLM-heavy) generation (LLM) span (data only)
Error Handling Two-tier fallback, partial success Circuit breaker, minimal analysis (confidence=0.0) Paper count mismatch, empty synthesis (confidence=0.0) Rare failures (pure data transformation)
Confidence Scoring N/A Based on RAG context quality Based on synthesis completeness N/A
Main Dependencies ArxivClient, PDFProcessor, EmbeddingGenerator, VectorStore RAGRetriever, AzureOpenAI RAGRetriever, AzureOpenAI PricingConfig
Failure Mode Returns empty papers/chunks, appends errors Returns confidence=0.0 analyses Returns empty synthesis, confidence=0.0 Returns original state, appends errors
Cost Impact Embedding tokens (~$0.01 per 100k tokens) Input/output tokens (~$0.15-$0.60 per 1M tokens) Input/output tokens (~$0.15-$0.60 per 1M tokens) None (calculates cost, doesn't incur)
Typical Duration 5-15s (download + embed) 30-60s (parallel, 4 papers) 10-20s (single synthesis) <1s
State Mutation Adds papers, chunks Adds analyses Adds synthesis Adds validated_output
Thread Safety N/A Yes (token_lock for shared counter) N/A N/A
Deterministic Yes (fixed search results, deterministic embeddings) Yes (temperature=0) Yes (temperature=0) Yes

8. Troubleshooting and Debugging

Common Issues and Solutions

Issue 1: msgpack Serialization Error

Symptom:

TypeError: can't serialize <class 'gradio.Progress'>

Cause: Non-serializable object added to state (Gradio Progress, file handles, callbacks)

Solution:

  1. Never add complex objects to state
  2. Keep them as local variables instead
  3. See BUGFIX_MSGPACK_SERIALIZATION.md for detailed fix

Example Fix:

# WRONG
def run_workflow(workflow_app, initial_state, config, progress):
    initial_state["progress"] = progress  # ❌
    return workflow_app.invoke(initial_state, config)

# CORRECT
def run_workflow(workflow_app, initial_state, config, progress):
    # Keep progress as local variable
    for event in workflow_app.stream(initial_state, config):
        if progress:
            progress(0.5, desc="Processing...")  # βœ…
    return final_state

Issue 2: All Analyses Filtered Out

Symptom:

WARNING: No valid analyses after filtering. Ending workflow.

Cause: All analyses have confidence_score < 0.7 (filter threshold)

Root Causes:

  • RAG retrieval failed (no chunks found)
  • LLM returned malformed JSON repeatedly
  • Circuit breaker triggered after 2 failures

Debugging Steps:

  1. Check LangFuse traces: See which papers failed

    from observability import TraceReader
    
    reader = TraceReader()
    traces = reader.get_traces(session_id="session-abc123")
    analyzer_spans = reader.filter_by_agent(traces, "analyzer_agent")
    
    for span in analyzer_spans:
        print(f"Paper: {span.metadata.get('arxiv_id')}")
        print(f"Confidence: {span.metadata.get('confidence_score')}")
    
  2. Check RAG retrieval: Verify chunks were found

    # In analyzer_agent.py, add logging
    logger.info(f"Retrieved {len(unique_chunks)} chunks for {paper.arxiv_id}")
    
  3. Lower filter threshold temporarily:

    # orchestration/nodes.py:77
    threshold = 0.5  # Lower from 0.7 to accept more analyses
    
  4. Check circuit breaker:

    # agents/analyzer.py
    logger.error(f"Circuit breaker triggered after {self.consecutive_failures} failures")
    # If you see this, investigate first 2 failures
    

Issue 3: Retriever Returns No Papers

Symptom:

WARNING: No papers retrieved. Ending workflow.

Cause: arXiv search returned no results (or primary/fallback clients both failed)

Debugging Steps:

  1. Check query and category:

    logger.info(f"Searching arXiv: query='{query}', category='{category}'")
    # Verify query is reasonable and category is valid (e.g., 'cs.AI', not 'AI')
    
  2. Test arXiv search manually:

    # In terminal
    python -c "import arxiv; print(list(arxiv.Search('transformer').results())[:3])"
    
  3. Check fallback client:

    # agents/retriever.py:69-97
    logger.warning(f"Primary client failed: {str(e)}, trying fallback...")
    # If you see this, primary client (MCP) is failing
    
  4. Disable MCP temporarily:

    # .env
    USE_MCP_ARXIV=false  # Force direct arXiv API
    

Issue 4: Synthesis Returns Empty Results

Symptom:

{
    "consensus_points": [],
    "contradictions": [],
    "research_gaps": [],
    "summary": ""
}

Cause: LLM returned empty synthesis (or normalization stripped all data)

Debugging Steps:

  1. Check LangFuse trace for synthesis LLM call:

    • View full prompt sent to LLM
    • View full completion received
    • Check if completion was actually empty or normalization failed
  2. Verify paper summaries in prompt:

    # agents/synthesis.py:54-133
    logger.debug(f"Synthesis prompt:\n{prompt}")
    # Check if paper summaries are actually populated
    
  3. Check normalization:

    # agents/synthesis.py:135-196
    logger.debug(f"Raw LLM response: {data}")
    logger.debug(f"Normalized response: {normalized}")
    # Verify normalization isn't stripping valid data
    
  4. Increase max_tokens:

    # agents/synthesis.py:280
    max_tokens=3000  # Increase from default if synthesis is cut off
    

Issue 5: Cost Estimate is $0.00

Symptom:

Cost: $0.0000

Cause: Token usage not tracked properly

Debugging Steps:

  1. Check token_usage in state:

    logger.info(f"Token usage: {state['token_usage']}")
    # Should show non-zero input_tokens, output_tokens, embedding_tokens
    
  2. Verify agents are updating token_usage:

    # AnalyzerAgent should do:
    state["token_usage"]["input_tokens"] = self.total_input_tokens
    
    # SynthesisAgent should do:
    state["token_usage"]["input_tokens"] += response.usage.prompt_tokens
    
  3. Check pricing configuration:

    from utils.config import get_pricing_config
    
    pricing = get_pricing_config()
    print(pricing.get_model_pricing("gpt-4o-mini"))
    # Should return {"input": 0.15, "output": 0.60} per 1M tokens
    

Reading LangFuse Traces

Accessing LangFuse:

  1. Web UI: https://cloud.langfuse.com (or self-hosted URL)
  2. Python API:
    from observability import TraceReader
    
    reader = TraceReader()
    traces = reader.get_traces(limit=10)
    

Trace Structure:

Trace (session-abc123)
β”‚
β”œβ”€ Span: retriever_agent
β”‚  β”œβ”€ Generation: retriever_agent_run
β”‚  └─ Generation: embeddings (Azure OpenAI)
β”‚
β”œβ”€ Span: analyzer_agent
β”‚  β”œβ”€ Generation: analyzer_agent_run
β”‚  β”œβ”€ Generation: LLM Call 1 (paper 1)
β”‚  β”œβ”€ Generation: LLM Call 2 (paper 2)
β”‚  └─ Span: rag_retrieve
β”‚
β”œβ”€ Span: filter_low_confidence
β”‚
β”œβ”€ Span: synthesis_agent
β”‚  β”œβ”€ Generation: synthesis_agent_run
β”‚  └─ Generation: LLM Call (synthesis)
β”‚
└─ Span: citation_agent
   └─ Span: citation_agent_run

What to Look For:

1. Execution Duration:

  • Span duration = total time including child spans
  • Generation duration = time for single LLM call
  • Look for slow spans (>60s) indicating bottlenecks

2. Token Usage:

  • Generations show usage.prompt_tokens and usage.completion_tokens
  • High token usage = higher cost
  • Unusually low tokens may indicate truncation

3. Errors:

  • Spans with level: ERROR indicate failures
  • Check metadata.error for exception details
  • Trace errors back to specific papers/operations

4. LLM Prompts/Completions:

  • Click on Generation to see full prompt and completion
  • Verify prompt includes expected context
  • Check if completion is valid JSON

Example Query:

from observability import TraceReader, AgentPerformanceAnalyzer

reader = TraceReader()
analyzer = AgentPerformanceAnalyzer()

# Get failed traces
traces = reader.get_traces(limit=100)
failed_traces = [t for t in traces if t.status == "ERROR"]

print(f"Failed traces: {len(failed_traces)}/{len(traces)}")

# Analyze analyzer latency
stats = analyzer.agent_latency_stats("analyzer_agent", days=7)
print(f"Analyzer P95 latency: {stats.p95_latency_ms:.2f}ms")

# Check error rates
error_rates = analyzer.error_rates(days=7)
for agent, rate in error_rates.items():
    print(f"{agent}: {rate:.1%} error rate")

State Inspection Techniques

During Development (in agent code):

# agents/analyzer.py
def run(self, state: Dict[str, Any]) -> Dict[str, Any]:
    # Print state keys at entry
    logger.debug(f"State keys: {state.keys()}")

    # Print specific values
    papers = state.get("papers", [])
    logger.debug(f"Received {len(papers)} papers: {[p.arxiv_id for p in papers]}")

    # ... agent logic ...

    # Print state changes before return
    logger.debug(f"Returning {len(state.get('analyses', []))} analyses")

    return state

In Gradio UI (during workflow execution):

# app.py
final_state = run_workflow(workflow_app, initial_state, config, progress)

# Inspect final state
print(f"Final state keys: {final_state.keys()}")
print(f"Papers: {len(final_state.get('papers', []))}")
print(f"Analyses: {len(final_state.get('analyses', []))}")
print(f"Errors: {final_state.get('errors', [])}")
print(f"Token usage: {final_state.get('token_usage', {})}")

Using Checkpointer (post-execution):

# Get state at specific checkpoint
from orchestration.workflow_graph import get_workflow_state

thread_id = "session-abc123"
state_after_analyzer = get_workflow_state(workflow_app, thread_id, checkpoint_id="after-analyzer")

print(f"Analyses after analyzer: {len(state_after_analyzer.get('analyses', []))}")

# Compare with state after filter
state_after_filter = get_workflow_state(workflow_app, thread_id, checkpoint_id="after-filter")
print(f"Analyses after filter: {len(state_after_filter.get('filtered_analyses', []))}")
print(f"Filtered out: {len(state_after_analyzer['analyses']) - len(state_after_filter['filtered_analyses'])}")

Log Analysis Patterns

Log Levels:

  • INFO: Normal workflow progress (agent start/completion, counts)
  • WARNING: Recoverable issues (fallback triggered, empty results, low confidence)
  • ERROR: Failures (exceptions caught, agent failures, API errors)
  • DEBUG: Detailed debugging (state contents, intermediate values)

Useful Log Patterns:

1. Track Workflow Progress:

# In terminal, tail logs and grep for agent completions
tail -f app.log | grep "completed"

# Output:
# INFO: Retriever completed. Papers: 5, Chunks: 237
# INFO: Analyzer completed. Analyses: 5
# INFO: Filter completed. Valid: 4/5
# INFO: Synthesis completed. Consensus: 3, Contradictions: 1
# INFO: Citation completed. Cost: $0.0234

2. Identify Failures:

# Grep for ERROR logs
grep "ERROR" app.log | tail -20

# Analyze common failure patterns
grep "ERROR" app.log | cut -d':' -f4- | sort | uniq -c | sort -rn

3. Track Fallback Usage:

# Check how often fallback client is used
grep "trying fallback" app.log | wc -l
grep "Searching with fallback client" app.log | wc -l

4. Monitor Circuit Breaker:

# Check if circuit breaker is triggering
grep "Circuit breaker triggered" app.log

# If found, investigate what caused consecutive failures
grep "consecutive_failures" app.log

5. Analyze Token Usage:

# Extract token usage from logs
grep "Token usage" app.log | tail -10

# Calculate total cost
grep "Cost:" app.log | awk '{sum+=$NF} END {print "Total: $"sum}'

Appendix: File Reference

Agent Implementations:

  • agents/retriever.py - RetrieverAgent with fallback mechanisms
  • agents/analyzer.py - AnalyzerAgent with parallel processing and circuit breaker
  • agents/synthesis.py - SynthesisAgent with cross-paper analysis
  • agents/citation.py - CitationAgent with APA formatting and cost calculation

Orchestration:

  • orchestration/__init__.py - Module exports
  • orchestration/nodes.py - Node wrappers with tracing and error handling
  • orchestration/workflow_graph.py - LangGraph workflow builder and execution

State Management:

  • utils/langgraph_state.py - AgentState TypedDict and initialization helpers
  • utils/schemas.py - Pydantic models for all data structures

Observability:

  • utils/langfuse_client.py - LangFuse client initialization and @observe decorator
  • observability/trace_reader.py - Trace querying and export API
  • observability/analytics.py - Performance analytics and trajectory analysis

Configuration:

  • utils/config.py - Pricing configuration and environment variables
  • .env.example - Environment variable template

Documentation:

  • CLAUDE.md - Comprehensive system-wide developer guide
  • AGENTS.md - This document (agent architecture deep-dive)
  • REFACTORING_SUMMARY.md - LangGraph + LangFuse refactoring details
  • BUGFIX_MSGPACK_SERIALIZATION.md - msgpack serialization fix
  • observability/README.md - Observability documentation

Document Maintenance

Last Updated: 2025-12-20

Version: 1.0

Authors: Claude Sonnet 4.5 (auto-generated from codebase exploration)

Changelog:

  • 2025-12-20: Initial creation with comprehensive agent documentation

Contributing:

  • When adding new agents, update Section 3 (Individual Agent Deep Dives)
  • When adding new patterns, update Section 4 (Cross-Cutting Patterns)
  • When modifying workflow, update Section 5 (Workflow Orchestration)
  • Keep Agent Comparison Reference (Section 7) in sync with agent changes

End of AGENTS.md