"""Portfolio Intelligence Platform - Gradio Interface. Main application file for the hackathon demo. Features: - Single-page workflow (input → loading → results) - Unified dashboard with responsive bento grid layout - Auto-refreshing visualisations - Full-screen responsive design - Agent reasoning transparency """ import os import warnings from dotenv import load_dotenv # Load environment variables first (before any other imports) load_dotenv() # Configure logging early (before other imports create loggers) # Controlled via environment variables: # LOG_LEVEL: DEBUG, INFO, WARNING, ERROR, CRITICAL, OFF # LOGGING_ENABLED: true/false from backend.logging_config import configure_logging configure_logging() # Disable telemetry on HuggingFace Spaces if os.environ.get("SPACE_ID"): os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" os.environ["HF_HUB_DISABLE_TELEMETRY"] = "true" # Now import remaining modules import gradio as gr import asyncio import re import logging import time import random import numpy as np import traceback import sys from typing import List, Dict, Any, Tuple, Optional from datetime import datetime # Initialise Sentry monitoring early (before other imports that might raise errors) from backend.monitoring import initialise_sentry initialise_sentry() from backend.mcp_router import mcp_router from backend import mcp_tools from backend.agents.workflow import PortfolioAnalysisWorkflow from backend.models.agent_state import AgentState from backend.database import db from backend.theme import get_financial_theme, FINANCIAL_CSS from backend.visualizations import ( create_portfolio_allocation_chart, create_risk_metrics_dashboard, create_performance_chart, create_correlation_heatmap, create_optimization_comparison, ) from backend.stress_testing import ( PortfolioStressTest, STRESS_SCENARIOS, create_monte_carlo_paths_plot, create_scenario_comparison_chart, create_drawdown_analysis_chart, create_stress_test_dashboard, ) from backend.agents.personas import get_available_personas from backend.tax.interface import create_tax_analysis, format_tax_analysis_output from backend.config import settings from backend.rate_limiting import ( GradioRateLimitMiddleware, UserTier, ) from backend.rate_limiting.fixed_window import TieredFixedWindowLimiter from backend.auth import auth, UserSession from backend.export import export_analysis_to_csv, export_analysis_to_pdf # Import ensemble predictor at startup so @spaces.GPU decorator is detected by ZeroGPU # This module contains GPU-accelerated ML forecasting (Chronos, TTM, N-HiTS) import backend.mcp_servers.ensemble_predictor_mcp # noqa: F401 def check_authentication(session_state: Dict) -> bool: """Check if user is authenticated or in demo mode.""" if not session_state: return False # Allow demo mode (rate-limited anonymous access) if session_state.get("is_demo", False): return True # Check regular authentication session = UserSession.from_dict(session_state) return session is not None and session.user_id is not None # Suppress websockets deprecation warnings # Note: websockets 15.0+ deprecated WebSocketServerProtocol used by uvicorn 0.38.0 # This is a known issue with no functionality impact. The legacy API is supported until 2030. # To eliminate warnings: upgrade to uvicorn 0.35.0+ when available with SansIO implementation # See: https://websockets.readthedocs.io/en/stable/project/changelog.html warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets.legacy") warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets.server") # Suppress Gradio internal deprecation warnings warnings.filterwarnings("ignore", category=DeprecationWarning, module="gradio.routes") # Get logger for this module (logging already configured via configure_logging()) logger = logging.getLogger(__name__) # Initialize workflow workflow = PortfolioAnalysisWorkflow(mcp_router) # Custom get_user_tier function for session-aware rate limiting def get_user_tier_from_session(request: Optional[gr.Request], session_state: Optional[dict] = None): """Determine user tier from session state. Args: request: Gradio request object session_state: Session state dict containing user authentication info Returns: Tuple of (identifier, tier) - For authenticated users: (user_id, AUTHENTICATED) - For demo mode: (ip_hash, ANONYMOUS) """ import hashlib # Check if user is authenticated via session state if session_state and isinstance(session_state, dict): user_id = session_state.get("user_id") is_demo = session_state.get("is_demo", False) # Authenticated user: use user_id for rate limiting if user_id and not is_demo: return str(user_id), UserTier.AUTHENTICATED # Demo mode or unauthenticated: use IP-based rate limiting # Extract client IP from request client_ip = "unknown" if request: try: if hasattr(request, "client") and request.client: if hasattr(request.client, "host"): client_ip = request.client.host elif isinstance(request.client, str): client_ip = request.client # Check headers for forwarded IPs (behind proxy) if hasattr(request, "headers"): forwarded = request.headers.get("X-Forwarded-For") if forwarded: client_ip = forwarded.split(",")[0].strip() except Exception as e: logger.warning(f"Error extracting client IP: {e}") # Hash IP for privacy identifier = hashlib.sha256(client_ip.encode()).hexdigest()[:16] return identifier, UserTier.ANONYMOUS # Initialize rate limiter rate_limiter = None rate_limit_middleware = None if settings.rate_limit_enabled: try: rate_limiter = TieredFixedWindowLimiter( tier_limits={ UserTier.ANONYMOUS: settings.rate_limit_anonymous_capacity, UserTier.AUTHENTICATED: settings.rate_limit_authenticated_capacity, UserTier.PREMIUM: settings.rate_limit_premium_capacity, }, redis_url=settings.redis_url ) rate_limit_middleware = GradioRateLimitMiddleware( rate_limiter, get_user_tier=get_user_tier_from_session ) logger.info("Rate limiting enabled with fixed window (daily reset at midnight UTC)") except Exception as e: logger.error(f"Failed to initialise rate limiter: {e}") logger.warning("Continuing without rate limiting") else: logger.info("Rate limiting disabled") # Global state for visualisations LAST_ANALYSIS_STATE = None HISTORY_RECORDS = [] # Stores loaded history records for row selection LAST_STRESS_TEST = None LAST_EXPORT_PDF_PATH = None # Pre-generated PDF export path LAST_EXPORT_CSV_PATH = None # Pre-generated CSV export path # Global state for audio generation LAST_ANALYSIS_TEXT = None # Stores analysis text for audio generation LAST_BUILD_RESULT = None # Stores build portfolio result for audio LAST_DEBATE_DATA = None # Stores debate data for audio simulation # Loading screen rotating messages with MCP phases and disclaimers LOADING_MESSAGES = [ "MCP Workflow: Initialising Model Context Protocol servers...", "Disclaimer: This is NOT financial advice - consult a professional!", "MCP: Aggregating context from multiple data sources...", "Humour: If in doubt, blame the algorithm!", "MCP Workflow: Processing portfolio through context pipeline...", "Warning: Past performance does not guarantee future results!", "MCP: Routing requests to specialised analysis servers...", "Remember: Markets can stay irrational longer than you can stay solvent!", "MCP Workflow: Extracting insights from financial context...", "Important: Not a substitute for professional financial guidance!", "MCP: Coordinating between data, analysis, and reporting contexts...", "Pro tip: Diversification - the only free lunch in investing!", "Phase 1: Fetching market data from Yahoo Finance, FMP, FRED...", "Phase 2: Running portfolio optimisation and risk analysis...", "Phase 3: Generating AI insights with Claude Sonnet 4.5...", "Data Collection: Retrieving real-time quotes and historical prices...", "Computation: Calculating portfolio metrics and risk indicators...", "Analysis: Synthesising comprehensive portfolio recommendations...", ] def parse_portfolio_input(portfolio_text: str) -> List[Dict[str, Any]]: """Parse portfolio input text into structured holdings. Supports formats: - AAPL 50 (50 shares) - TSLA 25 shares - NVDA $5000 (dollar amount) - BTC 0.5 (fractional shares/crypto) Args: portfolio_text: Raw text input from user Returns: List of holding dictionaries """ holdings = [] lines = portfolio_text.strip().split('\n') for line in lines: line = line.strip() if not line: continue # Match patterns: TICKER QUANTITY [shares] or TICKER $AMOUNT match = re.match(r'([A-Za-z]+)\s+(\$)?([0-9.]+)\s*(shares)?', line, re.IGNORECASE) if match: ticker = match.group(1).upper() is_dollar = match.group(2) == '$' amount = float(match.group(3)) holdings.append({ 'ticker': ticker, 'quantity': amount if not is_dollar else 0, 'dollar_amount': amount if is_dollar else 0, 'cost_basis': 0 }) else: logger.warning(f"Could not parse line: {line}") return holdings def aggregate_holdings(holdings: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: """Aggregate holdings by ticker while preserving lot-level data. Groups multiple entries for the same ticker together and tracks individual tax lots. Useful for handling duplicate entries and mixed entry types (shares + dollars). Args: holdings: List of holding dictionaries from parse_portfolio_input Returns: Dictionary with ticker as key and aggregated data as value, containing: ticker, total_quantity, total_dollar_amount, lots, warnings """ aggregated = {} for holding in holdings: ticker = holding['ticker'] if ticker not in aggregated: aggregated[ticker] = { 'ticker': ticker, 'total_quantity': 0, 'total_dollar_amount': 0, 'lots': [], 'warnings': [] } aggregated[ticker]['total_quantity'] += holding.get('quantity', 0) aggregated[ticker]['total_dollar_amount'] += holding.get('dollar_amount', 0) aggregated[ticker]['lots'].append(holding) # Check for duplicates and mixed entry types for ticker, data in aggregated.items(): if len(data['lots']) > 1: has_shares = any(lot['quantity'] > 0 for lot in data['lots']) has_dollars = any(lot['dollar_amount'] > 0 for lot in data['lots']) if has_shares and has_dollars: data['warnings'].append( f"{ticker} has mixed entry types (shares and dollars). " f"Total: {data['total_quantity']:.2f} sh + ${data['total_dollar_amount']:,.2f}" ) else: data['warnings'].append( f"{ticker} appears {len(data['lots'])} times. " f"Aggregated: {data['total_quantity']:.2f} shares" if has_shares else f"Aggregated: ${data['total_dollar_amount']:,.2f}" ) return aggregated async def run_analysis( portfolio_text: str, roast_mode: bool = False, persona: Optional[str] = None ) -> str: """Run portfolio analysis workflow. Args: portfolio_text: Raw portfolio input roast_mode: If True, use brutal honesty mode persona: Optional investor persona (e.g., 'warren_buffett') Returns: Formatted analysis result """ global LAST_ANALYSIS_STATE if not portfolio_text.strip(): return "❌ Please enter your portfolio holdings" try: holdings = parse_portfolio_input(portfolio_text) if not holdings: return "❌ Could not parse portfolio. Please use format: TICKER QUANTITY" logger.info( f"Starting analysis for {len(holdings)} holdings " f"(Roast Mode: {roast_mode}, Persona: {persona or 'None'})" ) # Construct initial state portfolio_id = f"demo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" initial_state: AgentState = { 'portfolio_id': portfolio_id, 'user_query': 'Analyse my portfolio', 'risk_tolerance': 'moderate', 'holdings': holdings, 'historical_prices': {}, 'fundamentals': {}, 'economic_data': {}, 'realtime_data': {}, 'technical_indicators': {}, 'optimisation_results': {}, 'risk_analysis': {}, 'ai_synthesis': '', 'recommendations': [], 'reasoning_steps': [], 'current_step': 'starting', 'errors': [], 'mcp_calls': [], 'phase_1_duration_ms': None, 'phase_1_5_duration_ms': None, 'phase_2_duration_ms': None, 'phase_2_5_duration_ms': None, 'phase_3_duration_ms': None, 'llm_input_tokens': None, 'llm_output_tokens': None, 'llm_total_tokens': None, 'llm_request_count': None, 'feature_vectors': {}, 'ensemble_forecasts': {}, 'sentiment_data': {}, } # Create workflow with persona or roast mode # Note: persona takes precedence over roast mode if persona and persona != "standard": analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, persona=persona) else: analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, roast_mode=roast_mode) # Run workflow final_state = await analysis_workflow.run(initial_state) LAST_ANALYSIS_STATE = final_state if final_state.get("errors"): return f"❌ Analysis failed: {'; '.join(final_state['errors'])}" # Format result return format_analysis_result(final_state, holdings) except Exception as e: logger.error(f"Analysis error: {e}", exc_info=True) return f"❌ Error during analysis: {str(e)}" def format_performance_metrics(final_state: AgentState) -> str: """Format performance metrics for reasoning display. Args: final_state: Final AgentState from workflow Returns: Formatted markdown string with performance metrics """ metrics_md = "### Performance Metrics\n\n" # Phase Execution Times phase_1_ms = final_state.get("phase_1_duration_ms", 0) or 0 phase_2_ms = final_state.get("phase_2_duration_ms", 0) or 0 phase_2_5_ms = final_state.get("phase_2_5_duration_ms", 0) or 0 phase_3_ms = final_state.get("phase_3_duration_ms", 0) or 0 total_ms = phase_1_ms + phase_2_ms + phase_2_5_ms + phase_3_ms metrics_md += "**Execution Timeline:**\n\n" metrics_md += f"- Phase 1 (Data Collection): {phase_1_ms:,}ms\n" metrics_md += f"- Phase 2 (Computation): {phase_2_ms:,}ms\n" if phase_2_5_ms > 0: metrics_md += f"- Phase 2.5 (ML Predictions): {phase_2_5_ms:,}ms\n" metrics_md += f"- Phase 3 (LLM Synthesis): {phase_3_ms:,}ms\n" metrics_md += f"- **Total**: {total_ms:,}ms ({total_ms / 1000:.2f}s)\n\n" # LLM Token Usage input_tokens = final_state.get("llm_input_tokens", 0) or 0 output_tokens = final_state.get("llm_output_tokens", 0) or 0 total_tokens = final_state.get("llm_total_tokens", 0) or 0 request_count = final_state.get("llm_request_count", 0) or 0 # Anthropic Claude Sonnet 4.5 pricing (as of early 2025) # Input: $3 per million tokens, Output: $15 per million tokens input_cost = (input_tokens / 1_000_000) * 3.0 output_cost = (output_tokens / 1_000_000) * 15.0 total_cost = input_cost + output_cost metrics_md += "**LLM Token Usage (Claude Sonnet 4.5):**\n\n" metrics_md += f"- Input Tokens: {input_tokens:,} (${input_cost:.4f})\n" metrics_md += f"- Output Tokens: {output_tokens:,} (${output_cost:.4f})\n" metrics_md += f"- **Total**: {total_tokens:,} tokens (${total_cost:.4f})\n" metrics_md += f"- API Requests: {request_count}\n\n" # MCP Calls mcp_calls = final_state.get("mcp_calls", []) if mcp_calls: metrics_md += f"**MCP Operations:** {len(mcp_calls)} total calls\n\n" # Group by MCP server mcp_groups = {} for call in mcp_calls: server = call.get("mcp_server") or call.get("mcp", "unknown") if server not in mcp_groups: mcp_groups[server] = [] mcp_groups[server].append(call) for server, calls in mcp_groups.items(): metrics_md += f"- **{server}**: {len(calls)} call{'s' if len(calls) != 1 else ''}\n" return metrics_md def format_analysis_result(final_state: AgentState, holdings: List[Dict[str, Any]]) -> str: """Format analysis result for display. Args: final_state: Final AgentState from workflow holdings: Original holdings list Returns: Formatted markdown string """ # Calculate total portfolio value total_value = sum(h.get('market_value', 0) for h in final_state.get('holdings', holdings)) # Format holdings holdings_md = "### Portfolio Holdings\n\n" for h in final_state.get('holdings', holdings): ticker = h.get("ticker", "N/A") shares = h.get("quantity", 0) value = h.get("market_value", 0) weight = (value / total_value * 100) if total_value > 0 else 0 # Format differently based on whether entered by shares or dollar amount if shares > 0: holdings_md += f"- **{ticker}**: {shares:.2f} shares | ${value:,.2f} ({weight:.1f}%)\n" else: holdings_md += f"- **{ticker}**: ${value:,.2f} ({weight:.1f}%)\n" # Extract risk metrics from risk_analysis risk_analysis = final_state.get("risk_analysis", {}) # Extract from nested dicts var_95_dict = risk_analysis.get("var_95", {}) var_95 = abs(float(var_95_dict.get("var_absolute", 0))) if isinstance(var_95_dict, dict) else abs(float(var_95_dict)) cvar_95_dict = risk_analysis.get("cvar_95", {}) cvar_95 = abs(float(cvar_95_dict.get("cvar_absolute", 0))) if isinstance(cvar_95_dict, dict) else abs(float(cvar_95_dict)) risk_metrics = risk_analysis.get("risk_metrics", {}) sharpe = float(risk_metrics.get("sharpe_ratio", 0)) volatility = float(risk_metrics.get("volatility_annual", 0)) * 100 # Extract advanced metrics information_ratio = risk_metrics.get("information_ratio") calmar_ratio = risk_metrics.get("calmar_ratio") ulcer_index = risk_metrics.get("ulcer_index") metrics_md = f""" ### Key Metrics - **Portfolio Value**: ${total_value:,.2f} - **Sharpe Ratio**: {sharpe:.2f} - **Volatility**: {volatility:.1f}% (annual) - **VaR (95%)**: ${var_95:,.2f} - **CVaR (95%)**: ${cvar_95:,.2f} """ # Add advanced metrics if available if information_ratio is not None: metrics_md += f"- **Information Ratio**: {float(information_ratio):.2f}\n" if calmar_ratio is not None: metrics_md += f"- **Calmar Ratio**: {float(calmar_ratio):.2f}\n" if ulcer_index is not None: metrics_md += f"- **Ulcer Index**: {float(ulcer_index):.2f}\n" # Format ML forecasts if available forecasts_md = "" ensemble_forecasts = final_state.get("ensemble_forecasts", {}) if ensemble_forecasts: forecasts_md = "### ML Price Forecasts (30-day)\n\n" for ticker, forecast_data in ensemble_forecasts.items(): if isinstance(forecast_data, dict): models_used = forecast_data.get("models_used", []) num_models = len(models_used) if models_used else forecast_data.get("metadata", {}).get("num_models", "N/A") # Get first and last prediction (current vs 30-day) predictions = forecast_data.get("predictions", []) if predictions and len(predictions) > 0: first_pred = float(predictions[0]) last_pred = float(predictions[-1]) change_pct = ((last_pred - first_pred) / first_pred * 100) if first_pred > 0 else 0 direction = "📈" if change_pct > 0 else "📉" if change_pct < 0 else "➡️" forecasts_md += f"- **{ticker}**: {direction} {change_pct:+.1f}% (using {num_models} models)\n" forecasts_md += "\n*Note: Forecasts combine Chronos foundation model with statistical baselines.*\n\n" # Get LLM synthesis and recommendations ai_synthesis = final_state.get("ai_synthesis", "Analysis completed successfully.") recommendations = final_state.get("recommendations", []) recs_md = "### Recommendations\n\n" if recommendations: for i, rec in enumerate(recommendations, 1): recs_md += f"{i}. {rec}\n" else: recs_md += "*No specific recommendations generated.*\n" # Final output output = f"""{holdings_md} {metrics_md} {forecasts_md} {ai_synthesis} {recs_md} --- *Interactive charts and detailed metrics are displayed below.* """ return output def create_visualisations() -> Tuple: """Create all visualisation charts from last analysis. Returns: Tuple of (allocation, risk, performance, correlation, optimization) plots """ global LAST_ANALYSIS_STATE if not LAST_ANALYSIS_STATE: empty_fig = None return (empty_fig, empty_fig, empty_fig, empty_fig, empty_fig) # Extract data from AgentState holdings = LAST_ANALYSIS_STATE.get("holdings", []) historical_prices = LAST_ANALYSIS_STATE.get("historical_prices", {}) risk_analysis = LAST_ANALYSIS_STATE.get("risk_analysis", {}) optimisation_results = LAST_ANALYSIS_STATE.get("optimisation_results", {}) # Create charts allocation_chart = create_portfolio_allocation_chart(holdings) if holdings else None # Extract nested risk metrics correctly if risk_analysis: var_95_dict = risk_analysis.get("var_95", {}) var_value = float(var_95_dict.get("var_percentage", 0)) if isinstance(var_95_dict, dict) else float(var_95_dict) cvar_95_dict = risk_analysis.get("cvar_95", {}) cvar_value = float(cvar_95_dict.get("cvar_percentage", 0)) if isinstance(cvar_95_dict, dict) else float(cvar_95_dict) risk_metrics = risk_analysis.get("risk_metrics", {}) sharpe_value = float(risk_metrics.get("sharpe_ratio", 0)) volatility_value = float(risk_metrics.get("volatility_annual", 0)) * 100 risk_chart = create_risk_metrics_dashboard( sharpe=sharpe_value, var=var_value, cvar=cvar_value, volatility=volatility_value ) else: risk_chart = None performance_chart = create_performance_chart(holdings, historical_prices) if historical_prices else None correlation_chart = create_correlation_heatmap(historical_prices) if historical_prices else None optimization_chart = create_optimization_comparison(optimisation_results) if optimisation_results else None return (allocation_chart, risk_chart, performance_chart, correlation_chart, optimization_chart) async def run_stress_test( scenario_name: str, n_simulations: int = 10000, time_horizon_days: int = 252 ) -> Tuple[str, Any, Any, Any, Any]: """Run stress test on current portfolio. Args: scenario_name: Name of stress scenario to apply n_simulations: Number of Monte Carlo simulations time_horizon_days: Simulation time horizon in days Returns: Tuple of (summary_text, dashboard_plot, mc_plot, scenario_plot, drawdown_plot) """ global LAST_ANALYSIS_STATE, LAST_STRESS_TEST if not LAST_ANALYSIS_STATE: return ( "Please run portfolio analysis first before stress testing.", None, None, None, None ) try: # Extract portfolio data holdings = LAST_ANALYSIS_STATE.get("holdings", []) historical_prices = LAST_ANALYSIS_STATE.get("historical_prices", {}) if not holdings or not historical_prices: return ( "Insufficient portfolio data for stress testing.", None, None, None, None ) # Initialise stress tester stress_tester = PortfolioStressTest( holdings=holdings, historical_prices=historical_prices, n_simulations=n_simulations ) LAST_STRESS_TEST = stress_tester # Get portfolio value portfolio_value = sum(h.get('market_value', 0) for h in holdings) # Run selected scenario or Monte Carlo if scenario_name == "monte_carlo": result = stress_tester.run_monte_carlo(time_horizon_days=time_horizon_days) dashboard_plot = create_stress_test_dashboard(result, portfolio_value) mc_plot = create_monte_carlo_paths_plot(result.simulation_paths, portfolio_value=portfolio_value) drawdown_plot = create_drawdown_analysis_chart(result.simulation_paths, portfolio_value=portfolio_value) scenario_plot = None summary = f"""# Monte Carlo Stress Test Results **Simulation Parameters:** - Number of simulations: {n_simulations:,} - Time horizon: {time_horizon_days} days (~{time_horizon_days/252:.1f} years) **Key Metrics:** - Mean return: {result.portfolio_return:.2f}% - Value at Risk (95%): {result.var_95:.2f}% - Conditional VaR (95%): {result.cvar_95:.2f}% - Maximum drawdown: {result.max_drawdown:.2f}% **Interpretation:** - In 95% of scenarios, your portfolio loses less than {abs(result.var_95):.1f}% - In the worst 5% of cases, average loss is {abs(result.cvar_95):.1f}% """ elif scenario_name == "all_scenarios": all_results = stress_tester.run_all_scenarios() scenario_plot = create_scenario_comparison_chart(all_results) dashboard_plot = None mc_plot = None drawdown_plot = None summary = "# All Stress Scenarios Comparison\n\n" summary += "**Portfolio Performance Under Historical Crises:**\n\n" for scenario_id, result in all_results.items(): recovery_text = f"{result.recovery_time_estimate} days (~{result.recovery_time_estimate/365:.1f} years)" if result.recovery_time_estimate else "N/A" summary += f"### {result.scenario_name}\n" summary += f"- Portfolio return: **{result.portfolio_return:.2f}%**\n" summary += f"- Value change: **${result.portfolio_value_change:,.0f}**\n" summary += f"- VaR 95%: {result.var_95:.2f}%\n" summary += f"- CVaR 95%: {result.cvar_95:.2f}%\n" summary += f"- Estimated recovery: {recovery_text}\n\n" else: # Run specific scenario scenario = STRESS_SCENARIOS.get(scenario_name) if not scenario: return ( f"Scenario '{scenario_name}' not found.", None, None, None, None ) result = stress_tester.apply_scenario(scenario) dashboard_plot = create_stress_test_dashboard(result, portfolio_value) mc_plot = None scenario_plot = None drawdown_plot = None recovery_text = f"{result.recovery_time_estimate} days (~{result.recovery_time_estimate/365:.1f} years)" if result.recovery_time_estimate else "Not estimated" summary = f"""# {result.scenario_name} - Stress Test Results **Scenario Description:** {scenario.description} **Portfolio Impact:** - Return under stress: **{result.portfolio_return:.2f}%** - Dollar impact: **${result.portfolio_value_change:,.0f}** - Current value: ${portfolio_value:,.0f} - Stressed value: ${portfolio_value + result.portfolio_value_change:,.0f} **Risk Metrics:** - Value at Risk (95%): {result.var_95:.2f}% - Conditional VaR (95%): {result.cvar_95:.2f}% - Value at Risk (99%): {result.var_99:.2f}% - Conditional VaR (99%): {result.cvar_99:.2f}% **Recovery Estimate:** - Estimated recovery time: {recovery_text} **Asset Contributions:** """ for ticker, contribution in sorted(result.asset_contributions.items(), key=lambda x: x[1]): summary += f"- {ticker}: {contribution:+.2f}%\n" return (summary, dashboard_plot, mc_plot, scenario_plot, drawdown_plot) except Exception as e: logger.error(f"Stress test error: {e}", exc_info=True) return ( f"Error running stress test: {str(e)}", None, None, None, None ) async def run_analysis_with_ui_update( session_state: Optional[Dict] = None, portfolio_text: str = "", roast_mode: bool = False, persona: Optional[str] = None, progress=gr.Progress() ) -> Tuple[str, str, str, Any, Any, Any, Any, Any, Any]: """Run analysis and return results with loading progress overlay. Uses rotating messages from LOADING_MESSAGES to display progress updates. Args: session_state: Session state dict containing user authentication info portfolio_text: Portfolio input roast_mode: If True, use brutal honesty mode persona: Optional investor persona (e.g., 'warren_buffett') progress: Gradio progress tracker Returns: Tuple of (page_state, analysis_text, performance_metrics, 5 charts, audio_btn_update) """ global LAST_ANALYSIS_STATE if not portfolio_text.strip(): return ( "input", "", "", None, None, None, None, None, gr.update(visible=False) ) try: progress(0, desc=random.choice(LOADING_MESSAGES)) await asyncio.sleep(0.3) holdings = parse_portfolio_input(portfolio_text) if not holdings: return ( "input", "", "", None, None, None, None, None, gr.update(visible=False) ) progress(0.1, desc=random.choice(LOADING_MESSAGES)) await asyncio.sleep(0.5) # Check if this is a demo session is_demo = session_state.get("is_demo", False) if session_state else False # Generate portfolio ID with appropriate prefix timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') if is_demo: portfolio_id = f"demo_{timestamp}" else: # Authenticated users get portfolio IDs with user prefix session = UserSession.from_dict(session_state) user_prefix = session.username[:8] if session and session.username else "user" portfolio_id = f"{user_prefix}_{timestamp}" # Only save portfolio to database for authenticated users # Demo users get ephemeral portfolios that exist only for the current session if not is_demo: try: session = UserSession.from_dict(session_state) user_id = session.user_id if session and session.user_id else None if not user_id: logger.error("No valid user_id available for portfolio creation") raise ValueError("User ID not available") portfolio_saved = await db.save_portfolio( portfolio_id=portfolio_id, user_id=user_id, name=f"Analysis {datetime.now().strftime('%Y-%m-%d %H:%M')}", risk_tolerance='moderate' ) if portfolio_saved: logger.info(f"✓ Portfolio {portfolio_id} created for user {user_id}") else: logger.error(f"✗ Failed to create portfolio {portfolio_id} in database") raise ValueError(f"Portfolio creation failed for {portfolio_id}") except Exception as e: logger.error(f"✗ Failed to save portfolio: {e}") raise ValueError(f"Database error: {e}") from e else: logger.info(f"Demo mode: Portfolio {portfolio_id} created (ephemeral, not saved to database)") initial_state: AgentState = { 'portfolio_id': portfolio_id, 'user_query': 'Analyse my portfolio', 'risk_tolerance': 'moderate', 'holdings': holdings, 'historical_prices': {}, 'fundamentals': {}, 'economic_data': {}, 'realtime_data': {}, 'technical_indicators': {}, 'optimisation_results': {}, 'risk_analysis': {}, 'ai_synthesis': '', 'recommendations': [], 'reasoning_steps': [], 'current_step': 'starting', 'errors': [], 'mcp_calls': [], 'phase_1_duration_ms': None, 'phase_1_5_duration_ms': None, 'phase_2_duration_ms': None, 'phase_2_5_duration_ms': None, 'phase_3_duration_ms': None, 'llm_input_tokens': None, 'llm_output_tokens': None, 'llm_total_tokens': None, 'llm_request_count': None, 'feature_vectors': {}, 'ensemble_forecasts': {}, 'sentiment_data': {}, } progress(0.2, desc="Phase 1: Fetching market data from Yahoo Finance, FMP, FRED...") # Create workflow with persona or roast mode # Note: persona takes precedence over roast mode if persona and persona != "standard": analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, persona=persona) else: analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, roast_mode=roast_mode) # Stream workflow execution with live progress updates final_state = None async for event in analysis_workflow.stream(initial_state): progress_val = event.get("progress", 0) message = event.get("message", "Processing...") progress(progress_val, desc=message) if event.get("event") == "complete": final_state = event.get("state") break final_state = event.get("state") if final_state is None: raise ValueError("Workflow did not return final state") LAST_ANALYSIS_STATE = final_state # Pre-generate export files immediately after analysis completes # This ensures DownloadButton has files ready on first click global LAST_EXPORT_PDF_PATH, LAST_EXPORT_CSV_PATH try: import tempfile # Generate PDF export pdf_bytes = export_analysis_to_pdf(final_state) with tempfile.NamedTemporaryFile( mode='wb', delete=False, suffix='.pdf', prefix='portfolio_analysis_', dir=tempfile.gettempdir() ) as f: f.write(pdf_bytes) LAST_EXPORT_PDF_PATH = f.name logger.info(f"Pre-generated PDF export: {LAST_EXPORT_PDF_PATH}") # Generate CSV export csv_content = export_analysis_to_csv(final_state) with tempfile.NamedTemporaryFile( mode='w', delete=False, suffix='.csv', prefix='portfolio_analysis_', dir=tempfile.gettempdir() ) as f: f.write(csv_content) LAST_EXPORT_CSV_PATH = f.name logger.info(f"Pre-generated CSV export: {LAST_EXPORT_CSV_PATH}") except Exception as e: logger.error(f"Failed to pre-generate export files: {e}") LAST_EXPORT_PDF_PATH = None LAST_EXPORT_CSV_PATH = None # Save analysis to database (Enhancement #4 - Historical Analysis Storage) # Only save for authenticated users - demo analyses are ephemeral if not is_demo: try: session = UserSession.from_dict(session_state) user_identifier = session.user_id if session and session.user_id else "unknown" logger.info(f"Saving analysis for portfolio {portfolio_id}, user: {user_identifier}") save_result = await db.save_analysis(portfolio_id, final_state) if save_result: logger.info(f"✓ Successfully saved analysis for portfolio {portfolio_id}") else: logger.error(f"✗ Failed to save analysis for portfolio {portfolio_id} (returned False)") # Note: We don't raise here because the analysis itself succeeded # The user still gets their results even if database save fails except Exception as e: logger.error(f"✗ Exception saving analysis for portfolio {portfolio_id}: {e}", exc_info=True) # Note: We don't raise here because the analysis itself succeeded else: logger.info(f"Demo mode: Analysis for portfolio {portfolio_id} not saved (ephemeral session)") charts = create_visualisations() analysis_text = format_analysis_result(final_state, holdings) performance_metrics = format_performance_metrics(final_state) progress(1.0, desc="Analysis complete!") return ( "results", analysis_text, performance_metrics, *charts, gr.update(visible=True) # analysis_audio_btn (show audio button) ) except Exception as e: logger.error(f"Analysis error: {e}", exc_info=True) return ( "input", f"❌ Error: {str(e)}", "", None, None, None, None, None, gr.update(visible=False) # analysis_audio_btn (hide on error) ) def update_live_preview(portfolio_text: str) -> str: """Update live preview with parsed portfolio summary (without prices). Aggregates duplicate tickers and detects mixed entry types, displaying warnings for data quality issues. Args: portfolio_text: Raw portfolio input text Returns: HTML formatted preview of portfolio """ if not portfolio_text.strip(): return """

Enter portfolio holdings to see preview

""" try: holdings = parse_portfolio_input(portfolio_text) if not holdings: return """

Unable to parse portfolio format

""" aggregated = aggregate_holdings(holdings) unique_tickers = len(aggregated) shares_tickers = sum(1 for d in aggregated.values() if d['total_quantity'] > 0) dollar_tickers = sum(1 for d in aggregated.values() if d['total_dollar_amount'] > 0) warnings = [] for data in aggregated.values(): warnings.extend(data['warnings']) warnings_html = "" if warnings: warnings_html = f"""

Duplicate/Mixed Entries

""" for warning in warnings: warnings_html += f"

{warning}

" warnings_html += "
" html = f"""

Portfolio Preview

Unique Tickers

{unique_tickers}

{shares_tickers} by shares • {dollar_tickers} by dollar

Assets (Aggregated)

""" for ticker, data in sorted(aggregated.items()): if data['total_dollar_amount'] > 0 and data['total_quantity'] > 0: value_text = f"{data['total_quantity']:.2f} sh + ${data['total_dollar_amount']:,.0f}" elif data['total_dollar_amount'] > 0: value_text = f"${data['total_dollar_amount']:,.0f}" else: value_text = f"{data['total_quantity']:.2f} sh" html += f"""
{ticker} {value_text}
""" html += f"""

Click "Get Current Prices" for live valuation

{warnings_html}
""" return html except Exception as e: logger.error(f"Preview update error: {e}") return f"

Error: {str(e)}

" async def fetch_and_update_preview(portfolio_text: str) -> str: """Fetch current prices and update preview with calculated values. Aggregates duplicate tickers and displays combined positions with pricing information from market data sources. Args: portfolio_text: Raw portfolio input text Returns: HTML formatted preview with current prices """ if not portfolio_text.strip(): return update_live_preview(portfolio_text) try: holdings = parse_portfolio_input(portfolio_text) if not holdings: return update_live_preview(portfolio_text) aggregated = aggregate_holdings(holdings) # Get tickers that need price lookup (have quantity but no dollar_amount) tickers_needing_prices = [ ticker for ticker, data in aggregated.items() if data['total_quantity'] > 0 and data['total_dollar_amount'] == 0 ] # Fetch current prices prices = {} if tickers_needing_prices: try: from backend.mcp_router import mcp_router quote_results = await mcp_router.call_yahoo_finance_mcp( tool="get_quote", params={"tickers": tickers_needing_prices} ) if quote_results and isinstance(quote_results, list): for quote in quote_results: if isinstance(quote, dict) and 'ticker' in quote and 'price' in quote: prices[quote['ticker']] = float(quote['price']) except Exception as e: logger.error(f"Error fetching prices: {e}") # Calculate aggregated values total_value = 0 aggregated_values = {} for ticker, data in aggregated.items(): if data['total_dollar_amount'] > 0: aggregated_values[ticker] = data['total_dollar_amount'] total_value += data['total_dollar_amount'] elif data['total_quantity'] > 0: price = prices.get(ticker, 0) value = data['total_quantity'] * price aggregated_values[ticker] = value total_value += value unique_tickers = len(aggregated) warnings = [] for data in aggregated.values(): warnings.extend(data['warnings']) warnings_html = "" if warnings: warnings_html = f"""

Duplicate/Mixed Entries

""" for warning in warnings: warnings_html += f"

{warning}

" warnings_html += "
" html = f"""

Portfolio Summary

Unique Tickers

{unique_tickers}

Total Value

${total_value:,.2f}

Holdings Breakdown (Aggregated)

""" for ticker in sorted(aggregated.keys()): data = aggregated[ticker] current_value = aggregated_values.get(ticker, 0) weight = (current_value / total_value * 100) if total_value > 0 else 0 if data['total_dollar_amount'] > 0 and data['total_quantity'] > 0: detail = f"{data['total_quantity']:.2f} sh + ${data['total_dollar_amount']:,.2f}" elif data['total_dollar_amount'] > 0: detail = f"${data['total_dollar_amount']:,.2f}" elif data['total_quantity'] > 0: price = prices.get(ticker, 0) if price > 0: detail = f"{data['total_quantity']:.2f} sh × ${price:.2f}" else: detail = f"{data['total_quantity']:.2f} sh (price unavailable)" else: detail = "—" html += f"""
{ticker}
{detail}
${current_value:,.2f}
{weight:.1f}%
""" html += f"""

Prices updated

{warnings_html}
""" return html except Exception as e: logger.error(f"Fetch preview error: {e}") return f"

Error fetching prices: {str(e)}

" def create_interface() -> gr.Blocks: """Create the main Gradio interface with single-page workflow. Returns: Gradio Blocks interface """ theme = get_financial_theme() # Custom CSS for full-width and responsive design custom_css = FINANCIAL_CSS + """ /* Full-width container */ .gradio-container { max-width: unset; width: 100%; padding: 1.5rem 2rem; } /* Sidebar Styling - Works with gr.Sidebar */ #main-sidebar { background: linear-gradient(135deg, #1e3a5f 0%, #2d5a7b 100%) !important; } #main-sidebar .sidebar-header { color: white !important; border-bottom: 2px solid rgba(255,255,255,0.2); padding-bottom: 10px; margin-bottom: 20px; } .nav-btn { width: 100%; margin-bottom: 10px; text-align: left !important; justify-content: flex-start !important; background: rgba(255,255,255,0.05) !important; border: 1px solid rgba(255,255,255,0.1) !important; color: white !important; } .nav-btn:hover { background: rgba(255,255,255,0.15) !important; border-color: rgba(255,255,255,0.3) !important; } .sidebar-header { color: white !important; border-bottom: 2px solid rgba(255,255,255,0.2); padding-bottom: 10px; margin-bottom: 20px; } /* Ensure all large buttons have consistent font size */ button.lg, .lg button { font-size: 1rem !important; font-weight: 600 !important; } /* Dark mode specific container adjustments - theme adaptive */ .dark .gradio-container { background-color: var(--body-background-fill-dark); } /* Responsive adjustments */ @media (max-width: 768px) { .gradio-container { padding: 1rem; } } /* Better spacing for input section */ .gr-group { padding: 24px; margin-bottom: 16px; } /* Format helper text styling - theme adaptive */ .format-helper { font-size: 14px; margin-top: 8px; } .dark .format-helper { color: var(--body-text-color-subdued); } /* Chart containers with proper sizing */ .plot-container { min-height: 400px; } /* Example buttons better spacing */ .gr-examples { margin-top: 16px; } /* Improved markdown sections - theme adaptive */ .dark .markdown-text { color: var(--body-text-color); } .dark .markdown-text h3 { color: var(--body-text-color); border-bottom: 1px solid var(--block-border-color); padding-bottom: 8px; } /* Hero section styling */ #hero-section { background: linear-gradient(135deg, #05478A 0%, #048CFC 50%, #11C7AA 100%); padding: 3rem 2rem; border-radius: 16px; text-align: center; margin-bottom: 2rem; color: white; } #hero-section h2 { font-size: 2.5rem; font-weight: 700; margin: 0 0 0.5rem 0; letter-spacing: -0.025em; } #hero-section p { font-size: 1.125rem; opacity: 0.95; margin: 0 0 1.5rem 0; font-weight: 400; } #hero-section .value-props { display: flex; justify-content: center; gap: 2rem; flex-wrap: wrap; margin-top: 1.5rem; } #hero-section .value-prop { display: flex; align-items: center; gap: 0.5rem; font-size: 0.95rem; font-weight: 500; } /* Compact feature cards for 1x4 layout */ .feature-card-compact { background: rgba(255, 255, 255, 0.05); backdrop-filter: blur(10px); -webkit-backdrop-filter: blur(10px); border: 1px solid rgba(255, 255, 255, 0.2); border-radius: 8px; padding: 1rem 0.75rem; box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); transition: transform 0.2s ease, box-shadow 0.2s ease, border-color 0.2s ease; text-align: center; min-height: 130px; display: flex; flex-direction: column; justify-content: center; } .feature-card-compact:hover { transform: translateY(-2px); box-shadow: 0 4px 8px rgba(0, 0, 0, 0.15); border-color: rgba(4, 140, 252, 0.4); } .feature-card-compact h4 { color: #048CFC; margin: 0.5rem 0 0.25rem 0; font-weight: 600; font-size: 0.9rem; } .feature-card-compact p { color: var(--body-text-color); opacity: 0.75; font-size: 0.75rem; margin: 0; line-height: 1.4; } .feature-icon-compact { font-size: 1.5rem; margin-bottom: 0.25rem; } /* Task selection cards - Light mode (default) */ .task-card { min-height: 160px; padding: 1.5rem 1rem; text-align: center; white-space: pre-line; transition: transform 0.2s ease, box-shadow 0.2s ease, border-color 0.2s ease; border-radius: 12px; background: white; border: 1px solid #e5e7eb; box-shadow: 0 2px 8px rgba(0, 0, 0, 0.08); color: #1f2937; } /* Dark mode task cards */ .dark .task-card { background: rgba(255, 255, 255, 0.05); border: 1px solid rgba(255, 255, 255, 0.2); box-shadow: none; color: inherit; } .task-card:hover:not(:disabled) { transform: translateY(-3px); box-shadow: 0 8px 20px rgba(0, 0, 0, 0.15); border-color: #288cfa; } .dark .task-card:hover:not(:disabled) { box-shadow: 0 6px 16px rgba(0, 0, 0, 0.2); border-color: rgba(4, 140, 252, 0.5); } .task-card:disabled { opacity: 0.5; cursor: not-allowed; } .task-card-icon { font-size: 2.5rem; margin-bottom: 0.75rem; } .task-card-title { font-size: 1.1rem; font-weight: 600; margin-bottom: 0.5rem; color: #1f2937; } .dark .task-card-title { color: inherit; } .task-card-description { font-size: 0.85rem; color: #4b5563; line-height: 1.4; } .dark .task-card-description { color: inherit; opacity: 0.8; } .coming-soon-badge { font-size: 0.7rem; background: rgba(255, 255, 255, 0.1); padding: 0.25rem 0.5rem; border-radius: 4px; margin-top: 0.5rem; display: inline-block; } /* Task page container */ .task-selection-container { max-width: 900px; margin: 0 auto; padding: 2rem; } /* Preview card - scrollable with wider layout */ .preview-card { background: rgba(255, 255, 255, 0.05); backdrop-filter: blur(10px); border: 1px solid rgba(255, 255, 255, 0.2); border-radius: 12px; padding: 0; flex: 1 1 auto !important; max-height: 800px !important; min-height: 0; overflow-y: auto; overflow-x: hidden; display: flex !important; flex-direction: column !important; box-sizing: border-box !important; } /* Custom scrollbar for preview card */ .preview-card::-webkit-scrollbar { width: 8px; } .preview-card::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.05); border-radius: 4px; } .preview-card::-webkit-scrollbar-thumb { background: rgba(4, 140, 252, 0.3); border-radius: 4px; } .preview-card::-webkit-scrollbar-thumb:hover { background: rgba(4, 140, 252, 0.5); } /* Firefox */ .preview-card { scrollbar-width: thin; scrollbar-color: rgba(4, 140, 252, 0.3) rgba(255, 255, 255, 0.05); } /* Scrollable markdown for bull/bear cases */ .scrollable-markdown { max-height: 400px; overflow-y: auto; padding-right: 10px; border: 1px solid rgba(255, 255, 255, 0.1); border-radius: 8px; padding: 1rem; background: rgba(255, 255, 255, 0.02); } .scrollable-markdown::-webkit-scrollbar { width: 6px; } .scrollable-markdown::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.05); border-radius: 3px; } .scrollable-markdown::-webkit-scrollbar-thumb { background: rgba(4, 140, 252, 0.3); border-radius: 3px; } /* Input card - unified with examples */ .input-card { background: rgba(255, 255, 255, 0.05); backdrop-filter: blur(10px); border: 1px solid rgba(255, 255, 255, 0.2); border-radius: 12px; padding: 1.5rem; margin-bottom: 0; flex: 1 1 auto !important; display: flex !important; flex-direction: column !important; overflow-y: visible !important; overflow-x: visible !important; max-height: none !important; min-height: 0; box-sizing: border-box !important; gap: 0 !important; } /* Custom scrollbar for input card */ .input-card::-webkit-scrollbar { width: 8px; } .input-card::-webkit-scrollbar-track { background: rgba(255, 255, 255, 0.05); border-radius: 4px; } .input-card::-webkit-scrollbar-thumb { background: rgba(4, 140, 252, 0.3); border-radius: 4px; } .input-card::-webkit-scrollbar-thumb:hover { background: rgba(4, 140, 252, 0.5); } /* Firefox scrollbar */ .input-card { scrollbar-width: thin; scrollbar-color: rgba(4, 140, 252, 0.3) rgba(255, 255, 255, 0.05); } .examples-header { margin-top: 0.25rem; margin-bottom: 0.25rem; padding-top: 0.5rem; border-top: 1px solid rgba(255, 255, 255, 0.1); color: #048CFC; font-weight: 600; font-size: 1rem; } /* Portfolio row - natural heights without stretching */ .portfolio-row { display: flex !important; flex-direction: row !important; align-items: stretch !important; height: 100% !important; min-height: 600px !important; } .portfolio-row > .gr-column { display: flex !important; flex-direction: column !important; flex: 1 1 0% !important; height: 100% !important; min-height: 100% !important; max-height: 100% !important; gap: 0 !important; overflow: visible !important; } .portfolio-row > * { flex: 1 !important; } .portfolio-row .block { display: flex !important; flex-direction: column !important; flex: 1 !important; } /* Button fixed sizing - prevent vertical expansion */ .portfolio-row button { flex: 0 0 45px !important; width: 100% !important; height: 45px !important; min-height: 45px !important; max-height: 45px !important; } /* Flex items min-height to prevent overflow */ .input-card > *:not([class*="dropdown"]), .preview-card > *:not([class*="dropdown"]) { min-height: 0; } /* Dropdown styling and z-index */ .gr-dropdown { z-index: 999 !important; position: relative !important; } .gr-dropdown-container { z-index: 999 !important; overflow: visible !important; position: relative !important; } /* Dropdown menu positioning - appear below button */ .gr-dropdown ul[role="listbox"], .gr-dropdown .options, .gr-dropdown-menu { z-index: 9999 !important; overflow: visible !important; position: absolute !important; top: 100% !important; left: 0 !important; width: 100% !important; max-height: 300px !important; overflow-y: auto !important; } /* Loading page styling */ .loading-header { animation: fadeIn 0.5s ease-in; } .loading-message { text-align: center; font-size: 1.1rem; color: #048CFC; padding: 2rem; min-height: 100px; display: flex; align-items: center; justify-content: center; } @keyframes fadeIn { from { opacity: 0; transform: translateY(-10px); } to { opacity: 1; transform: translateY(0); } } /* Example buttons styling */ .gr-examples button { background: linear-gradient(135deg, rgba(4, 140, 252, 0.1), rgba(17, 199, 170, 0.1)); border: 1px solid rgba(4, 140, 252, 0.3); border-radius: 8px; padding: 0.75rem 1rem; font-size: 0.9rem; transition: all 0.2s ease; } .gr-examples button:hover { background: linear-gradient(135deg, rgba(4, 140, 252, 0.2), rgba(17, 199, 170, 0.2)); border-color: rgba(4, 140, 252, 0.6); transform: translateY(-2px); } /* Mobile responsive - feature cards stack on small screens */ @media (max-width: 1024px) { .feature-card-compact { padding: 1.25rem; min-height: auto; } } /* Responsive adjustments */ @media (max-width: 768px) { #hero-section { padding: 2rem 1rem; } #hero-section h2 { font-size: 1.75rem; } #hero-section p { font-size: 1rem; } #hero-section .value-props { gap: 1rem; } .feature-card-compact { padding: 1.25rem; } .preview-card { max-height: 400px; } } /* Accessibility - WCAG AA Colour Contrast Improvements */ .demo-subtitle { color: rgba(255, 255, 255, 0.95) !important; /* Increased from 0.7 for better contrast */ font-size: 14px; } .disclaimer-text { color: rgba(255, 255, 255, 0.95) !important; /* Increased contrast */ font-size: 13px; } /* Update all placeholder text for better visibility */ input::placeholder, textarea::placeholder { color: rgba(255, 255, 255, 0.7) !important; /* Increased from 0.5 */ } /* Add focus indicators for keyboard navigation */ input:focus, textarea:focus, button:focus, select:focus { outline: 3px solid #4A9EFF !important; outline-offset: 2px; } /* Validation message colours with better contrast */ #signup-email-validation, #password-match-validation { font-size: 0.85rem; margin-top: 0.25rem; font-weight: 500; } """ with gr.Blocks( title="Portfolio Intelligence Platform", theme=theme, css=custom_css, fill_width=True, fill_height=True ) as demo: # Session state for authentication session_state = gr.State({}) # Pagination state for history views history_current_page = gr.State(1) history_current_page_results = gr.State(1) # Hero Section with auth status with gr.Row(): gr.HTML( """

Portfolio Intelligence Platform

AI-powered portfolio analysis with transparent multi-agent MCP orchestration

9 MCP Servers
Quantitative Models
Claude Sonnet 4.5
""", elem_classes="hero-container" ) # User info and logout button with gr.Row(): with gr.Column(scale=4): user_info = gr.Markdown("Not logged in", elem_id="user-info") with gr.Column(scale=1): logout_btn = gr.Button("Sign Out", variant="secondary", size="sm", visible=False) # Navigation sidebar using Gradio's native component with gr.Sidebar(position="left", open=False, visible=False, elem_id="main-sidebar") as sidebar: gr.Markdown("## Navigation", elem_classes="sidebar-header") # Navigation options nav_new_analysis = gr.Button("🏠 Home", variant="secondary", size="lg", elem_classes="nav-btn") nav_view_history = gr.Button("📜 View History", variant="secondary", size="lg", elem_classes="nav-btn") gr.Markdown("---") # User section (when logged in) with gr.Group(visible=False) as sidebar_user_section: gr.Markdown("### Account") sidebar_user_email = gr.Markdown("") nav_profile_btn = gr.Button("👤 Profile", variant="secondary", size="sm") nav_signout_btn = gr.Button("🚪 Sign Out", variant="secondary", size="sm") # Authentication container with gr.Group(visible=True, elem_id="auth-container") as login_container: gr.Markdown("## 🔐 Sign In to Continue") gr.Markdown("Please sign in or create an account to analyse your portfolio") with gr.Tabs(): # Login Tab with gr.Tab("Sign In"): login_email = gr.Textbox( label="Email Address", placeholder="your.email@example.com", type="email", elem_id="login-email" ) login_password = gr.Textbox( label="Password", placeholder="Enter your password", type="password", elem_id="login-password" ) forgot_password_btn = gr.Button( "Forgot password?", variant="secondary", size="sm", elem_classes="forgot-password-link" ) login_btn = gr.Button("Sign In", variant="primary", size="lg") login_message = gr.Markdown("") # Signup Tab with gr.Tab("Create Account"): signup_username = gr.Textbox( label="Username (optional)", placeholder="Choose a username", elem_id="signup-username" ) signup_email = gr.Textbox( label="Email Address", placeholder="your.email@example.com", type="email", elem_id="signup-email" ) signup_email_validation = gr.Markdown("", elem_id="signup-email-validation") signup_password = gr.Textbox( label="Password", placeholder="At least 15 characters (NIST 2024 requirement)", type="password", elem_id="signup-password" ) signup_password_strength = gr.Markdown("", elem_id="password-strength") signup_confirm = gr.Textbox( label="Confirm Password", placeholder="Re-enter your password", type="password", elem_id="signup-confirm-password" ) signup_password_match = gr.Markdown("", elem_id="password-match-validation") gr.Markdown( "*By signing up, you agree this is for educational purposes only and not financial advice.*", elem_classes="disclaimer-text" ) signup_btn = gr.Button("Create Account", variant="primary", size="lg") signup_message = gr.Markdown("") # Demo mode option gr.Markdown("---") gr.Markdown("## 🎯 Or Try Without Signing Up") demo_btn = gr.Button( "Try Demo", variant="secondary", size="lg" ) gr.Markdown("*1 free analysis per day • No account required*", elem_classes="demo-subtitle") demo_message = gr.Markdown("") # Modal 1: Request password reset (only email input) with gr.Group(visible=False, elem_id="password-reset-modal") as password_reset_modal: gr.Markdown("## Request Password Reset") gr.Markdown("Enter your email address and we'll send you a password reset link.") reset_email = gr.Textbox( label="Email Address", placeholder="your.email@example.com", type="email", elem_id="reset-email" ) with gr.Row(): reset_submit_btn = gr.Button("Send Reset Link", variant="primary", size="lg") reset_cancel_btn = gr.Button("Cancel", variant="secondary", size="lg") reset_message = gr.Markdown("") # Modal 2: Set new password (only appears after clicking email link) with gr.Group(visible=False, elem_id="password-update-modal") as password_update_modal: gr.Markdown("## Set New Password") gr.Markdown("Enter your new password below.") # Hidden fields to store recovery token_hash and email (populated by JavaScript) recovery_token_hash = gr.Textbox(visible=False, elem_id="recovery-token-hash") recovery_email = gr.Textbox( label="Email Address", placeholder="your.email@example.com", type="email", info="Enter the email address you used to request the password reset", elem_id="recovery-email" ) new_password = gr.Textbox( label="New Password", placeholder="At least 15 characters (NIST 2024 requirement)", type="password", elem_id="new-password" ) confirm_new_password = gr.Textbox( label="Confirm New Password", placeholder="Re-enter your new password", type="password", elem_id="confirm-new-password" ) with gr.Row(): update_password_btn = gr.Button("Update Password", variant="primary", size="lg") update_cancel_btn = gr.Button("Cancel", variant="secondary", size="lg") update_password_message = gr.Markdown("") # Task Selection Page (shown after authentication) with gr.Group(visible=False, elem_classes="task-selection-container") as task_page: gr.Markdown("## What would you like to do?", elem_classes="section-title") with gr.Row(equal_height=True): # Analyse Portfolio - enabled with gr.Column(scale=1, min_width=200): task_analyse_btn = gr.Button( value="📊\n\nAnalyse Portfolio\n\nGet comprehensive analysis of your current holdings", elem_classes=["task-card"], variant="primary", size="lg" ) # Build Portfolio - enabled with gr.Column(scale=1, min_width=200): task_build_btn = gr.Button( value="🏗️\n\nBuild Portfolio\n\nAI helps construct a portfolio based on your goals", elem_classes=["task-card"], variant="primary", interactive=True, size="lg" ) # Compare Strategies - enabled with gr.Column(scale=1, min_width=200): task_compare_btn = gr.Button( value="⚔️\n\nCompare Strategies\n\nSee bull vs bear perspectives on your investments", elem_classes=["task-card"], variant="primary", interactive=True, size="lg" ) # Test Changes with gr.Column(scale=1, min_width=200): task_test_btn = gr.Button( value="🔬\n\nTest Changes\n\nPreview impact before making portfolio changes", elem_classes=["task-card"], variant="secondary", size="lg" ) # Build Portfolio Page (shown when Build Portfolio task selected) with gr.Group(visible=False, elem_classes="build-portfolio-container") as build_page: gr.Markdown("## Build Your Portfolio", elem_classes="section-title") gr.Markdown("Tell us about your investment goals and we'll build a portfolio for you.") with gr.Row(equal_height=True): # Left column: Goal settings with gr.Column(scale=2): with gr.Group(elem_classes="input-card"): build_goals = gr.CheckboxGroup( choices=["Growth", "Income", "Capital Preservation", "Speculation"], label="Investment Goals", info="Select one or more investment objectives" ) build_risk_tolerance = gr.Slider( minimum=1, maximum=10, value=5, step=1, label="Risk Tolerance", info="1 = Very Conservative, 10 = Very Aggressive" ) build_constraints = gr.Textbox( label="Constraints (Optional)", placeholder="e.g., 'No crypto, ESG focused, minimum 10 stocks, max 5% per position'", lines=3, info="Any specific requirements or restrictions" ) with gr.Row(): build_submit_btn = gr.Button( "Build My Portfolio", variant="primary", size="lg" ) build_back_btn = gr.Button( "Back", variant="secondary", size="lg" ) # Right column: Results with gr.Column(scale=3): # Agent activity stream build_agent_chat = gr.Chatbot( label="🤖 Agent Activity", type="messages", height=400, visible=False, show_copy_button=True, elem_classes="agent-chat" ) with gr.Group(elem_classes="preview-card", visible=False) as build_results_container: build_status = gr.Markdown("", elem_classes="build-status") # Audio narration button and player with gr.Row(): build_audio_btn = gr.Button( "🔊 Listen to Portfolio", variant="secondary", size="sm", visible=False ) with gr.Row(): build_audio_player = gr.Audio( label="Portfolio Summary Audio", interactive=False, visible=False, show_download_button=True ) with gr.Row(): build_regenerate_btn = gr.Button("Regenerate", variant="secondary", size="sm") # Compare Strategies Page (shown when Compare Strategies task selected) with gr.Group(visible=False, elem_classes="compare-strategies-container") as compare_page: gr.Markdown("## Compare Strategies", elem_classes="section-title") gr.Markdown("Get bull and bear perspectives on your portfolio through multi-agent debate.") with gr.Row(equal_height=True): # Left column: Portfolio input with gr.Column(scale=2): with gr.Group(elem_classes="input-card"): compare_portfolio_input = gr.Textbox( placeholder="AAPL 50\nTSLA 25 shares\nNVDA $5000", label="Portfolio Holdings", lines=6, info="Enter your holdings to analyse" ) with gr.Row(): compare_submit_btn = gr.Button( "Run Analysis", variant="primary", size="lg" ) compare_back_btn = gr.Button( "Back", variant="secondary", size="lg" ) # Right column: Results with gr.Column(scale=3): # Debate activity stream compare_debate_chat = gr.Chatbot( label="🎭 Advisory Council Debate", type="messages", height=500, visible=False, show_copy_button=True, elem_classes="debate-chat" ) with gr.Group(elem_classes="preview-card", visible=False) as compare_results_container: compare_status = gr.Markdown("", elem_classes="compare-status") # Bull vs Bear side-by-side with gr.Row(): with gr.Column(): gr.Markdown("### Bull Case") compare_bull_case = gr.Markdown("", elem_classes="scrollable-markdown") compare_bull_confidence = gr.Number(label="Confidence %", interactive=False) with gr.Column(): gr.Markdown("### Bear Case") compare_bear_case = gr.Markdown("", elem_classes="scrollable-markdown") compare_bear_confidence = gr.Number(label="Confidence %", interactive=False) # Consensus gr.Markdown("### Consensus Recommendation") compare_consensus = gr.Markdown("") compare_stance = gr.Textbox(label="Stance", interactive=False) # Audio debate button and player with gr.Row(): compare_audio_btn = gr.Button( "🎭 Listen to Debate", variant="secondary", size="sm", visible=False ) with gr.Row(): compare_audio_player = gr.Audio( label="Advisory Council Debate Audio", interactive=False, visible=False, show_download_button=True ) # Debate transcript with gr.Accordion("View Full Debate", open=False): compare_debate_transcript = gr.JSON(label="Debate Rounds") # Test Changes Page (shown when Test Changes task selected) with gr.Group(visible=False, elem_classes="test-changes-container") as test_page: gr.Markdown("## Test Portfolio Changes", elem_classes="section-title") gr.Markdown("Preview the impact of changes before making them.") with gr.Row(equal_height=True): # Left column: Portfolio and changes input with gr.Column(scale=2): with gr.Group(elem_classes="input-card"): test_portfolio_input = gr.Textbox( placeholder="AAPL 30%\nTSLA 25%\nNVDA 25%\nBND 20%", label="Current Portfolio (ticker weight%)", lines=6, info="Enter your current portfolio allocations" ) test_changes_input = gr.Textbox( placeholder="sell TSLA 10\nbuy VTI 10", label="Proposed Changes", lines=4, info="Format: action ticker amount (e.g., 'sell TSLA 10' or 'buy VTI 15')" ) test_portfolio_value = gr.Number( label="Portfolio Value ($)", value=100000, info="Total portfolio value for calculations" ) with gr.Row(): test_submit_btn = gr.Button( "Run Simulation", variant="primary", size="lg" ) test_back_btn = gr.Button( "Back", variant="secondary", size="lg" ) # Right column: Results with gr.Column(scale=3): with gr.Group(elem_classes="preview-card", visible=False) as test_results_container: test_status = gr.Markdown("", elem_classes="test-status") # Side-by-side metrics comparison with gr.Row(): with gr.Column(): gr.Markdown("### Current Portfolio") test_current_metrics = gr.Dataframe( headers=["Metric", "Value"], label="Current Metrics", interactive=False ) with gr.Column(): gr.Markdown("### After Changes") test_simulated_metrics = gr.Dataframe( headers=["Metric", "Value"], label="Simulated Metrics", interactive=False ) # Impact summary gr.Markdown("### Impact Analysis") test_impact_summary = gr.Dataframe( headers=["Metric", "Current", "Simulated", "Change", "% Change"], label="Impact Summary", interactive=False ) # Stress test comparison with gr.Accordion("Stress Test Comparison", open=False): test_stress_comparison = gr.Dataframe( headers=["Scenario", "Shock %", "Current Loss", "Simulated Loss", "Improvement"], label="Stress Tests", interactive=False ) # Recommendations and assessment test_assessment = gr.Markdown("", elem_classes="test-assessment") test_recommendations = gr.Markdown("", label="Recommendations") # Input Page with side-by-side layout (hidden until authenticated) with gr.Group(visible=False) as input_page: # Side-by-side input and preview (2:3 ratio for wider preview) with gr.Row(equal_height=True, elem_classes="portfolio-row"): # Left column: Input + Examples (unified card) with gr.Column(scale=2): with gr.Group(elem_classes="input-card"): portfolio_input = gr.Textbox( placeholder="AAPL 50\nTSLA 25 shares\nNVDA $5000\nGLD 10 shares", label="Portfolio Holdings", lines=8, info="Enter your holdings below (see examples for format)" ) # Load past portfolio load_past_portfolio_dropdown = gr.Dropdown( choices=[], label="Load Past Portfolio (Last 3)", interactive=True, elem_id="load-past-portfolio-dropdown", info="Your 3 most recent portfolios" ) # Persona Selection persona_choices = [ ("Standard Analysis", "standard"), ("Warren Buffett - Value Investing", "warren_buffett"), ("Cathie Wood - Innovation & Growth", "cathie_wood"), ("Ray Dalio - Macro & Diversification", "ray_dalio"), ] persona_dropdown = gr.Dropdown( choices=persona_choices, value="standard", label="Analysis Persona", info="Choose an investor perspective for the analysis" ) # Roast Mode Toggle roast_mode_toggle = gr.Checkbox( label="Roast Mode", value=False, info="Enable brutal honesty mode for portfolio critique (only works with Standard Analysis)" ) # Action buttons with gr.Row(): analyse_btn = gr.Button( "Analyse Portfolio", variant="primary", size="lg", min_width=290 ) input_back_btn = gr.Button( "Back", variant="secondary", size="lg", min_width=290 ) # Examples integrated as simple buttons gr.Markdown("### Example Portfolios", elem_classes="examples-header") with gr.Row(): tech_btn = gr.Button("Tech Growth", size="sm", scale=1) conservative_btn = gr.Button("Conservative Income", size="sm", scale=1) balanced_btn = gr.Button("Balanced 60/40", size="sm", scale=1) global_btn = gr.Button("Global Diversified", size="sm", scale=1) risk_btn = gr.Button("Single Stock Risk", size="sm", scale=1) # Right column: Live preview (wider at scale=3) with gr.Column(scale=3): preview_output = gr.HTML( value=update_live_preview(""), elem_classes="preview-card" ) refresh_prices_btn = gr.Button( "🔄 Get Current Prices", variant="primary", size="sm", scale=1 ) # JavaScript to detect password reset token_hash in URL and populate hidden fields # This runs on page load and checks for recovery token in query parameters demo.load( fn=None, # No Python processing needed - JavaScript directly sets the value inputs=[], outputs=[recovery_token_hash], js=""" function() { // Check for recovery token_hash in URL query parameters (PKCE flow) const params = new URLSearchParams(window.location.search); const tokenHash = params.get('token_hash') || ''; const typeParam = params.get('type') || ''; if (tokenHash && typeParam === 'recovery') { console.log('Password reset token detected'); // Clean up URL (remove query parameters) window.history.replaceState({}, document.title, window.location.pathname); return tokenHash; } return ''; } """ ) # Add click handlers for example buttons tech_btn.click( fn=lambda: "AAPL 50 shares\nTSLA 25 shares\nNVDA 30 shares\nMETA 20 shares", outputs=portfolio_input, show_api=False ) conservative_btn.click( fn=lambda: "VOO 100 shares\nVTI 75 shares\nSCHD 50 shares\nTLT 40 shares\nVXUS 60 shares", outputs=portfolio_input, show_api=False ) balanced_btn.click( fn=lambda: "VTI $25000\nVXUS $15000\nBND $15000\nGLD $5000", outputs=portfolio_input, show_api=False ) global_btn.click( fn=lambda: "VTI $15000\nVXUS $10000\nVWO $5000\nBND $10000\nGLD $3000\nVNQ $2000", outputs=portfolio_input, show_api=False ) risk_btn.click( fn=lambda: "TSLA 100 shares", outputs=portfolio_input, show_api=False ) # Feature highlights grid (1x4 compact layout) gr.Markdown("### Key Features", elem_classes="section-title") with gr.Row(): with gr.Column(scale=1, min_width=200): gr.HTML( """
🧠

Smart Analysis

HRP, Black-Litterman, VaR/CVaR

""" ) with gr.Column(scale=1, min_width=200): gr.HTML( """

Fast Results

1-2 minutes

""" ) with gr.Column(scale=1, min_width=200): gr.HTML( """
🔒

Secure

No data stored

""" ) with gr.Column(scale=1, min_width=200): gr.HTML( """

AI Powered

Claude Sonnet 4.5

""" ) # Loading Page with gr.Group(visible=False) as loading_page: with gr.Row(): with gr.Column(): gr.HTML( """

Analysing Your Portfolio

""", elem_classes="loading-header" ) loading_message = gr.Markdown( value="Initialising analysis...", elem_classes="loading-message", ) gr.HTML( """

This may take 1-2 minutes as we run deep learning models on your portfolio

""" ) # Results Page (tabbed interface) with gr.Group(visible=False) as results_page: # Header with actions (always visible) with gr.Row(elem_id="results-header"): with gr.Column(scale=3): gr.Markdown("# Portfolio Analysis Results", elem_classes="page-title") with gr.Column(scale=1): with gr.Row(): export_pdf_btn = gr.DownloadButton("📄 Export PDF", size="sm") export_csv_btn = gr.DownloadButton("📊 Export CSV", size="sm") # Main tabbed interface with gr.Tabs() as results_tabs: # Tab 1: Analysis Results with gr.Tab("📊 Analysis Results"): with gr.Column(): analysis_output = gr.Markdown("") # Audio narration button and player with gr.Row(): analysis_audio_btn = gr.Button( "🔊 Listen to Analysis", variant="secondary", size="sm", visible=False ) with gr.Row(): analysis_audio_player = gr.Audio( label="Audio Summary", interactive=False, visible=False, show_download_button=True ) # Performance Metrics Accordion (progressive disclosure) with gr.Accordion("Performance Metrics & Reasoning", open=False): performance_metrics_output = gr.Markdown("") # Tab 2: Dashboard with gr.Tab("📈 Dashboard"): # Top row - Portfolio allocation + Risk metrics with gr.Row(equal_height=True): with gr.Column(scale=1, min_width=400): allocation_plot = gr.Plot(label="Portfolio Allocation", container=True) with gr.Column(scale=1, min_width=400): risk_plot = gr.Plot(label="Risk Metrics Dashboard", container=True) # Middle row - Performance chart with gr.Row(): performance_plot = gr.Plot(label="Historical Performance", container=True) # Bottom row - Correlation + Optimisation with gr.Row(equal_height=True): with gr.Column(scale=1, min_width=400): correlation_plot = gr.Plot(label="Asset Correlation Matrix", container=True) with gr.Column(scale=1, min_width=400): optimization_plot = gr.Plot(label="Optimisation Methods Comparison", container=True) # Tab 3: Tax Analysis with gr.Tab("💰 Tax Analysis"): gr.Markdown("Analyse tax implications and identify tax-loss harvesting opportunities.") # Top row: Input controls with gr.Row(): tax_filing_status = gr.Dropdown( choices=[ ("Single", "single"), ("Married Filing Jointly", "married_joint"), ("Married Filing Separately", "married_separate"), ("Head of Household", "head_of_household"), ], value="single", label="Filing Status", info="Your tax filing status" ) tax_annual_income = gr.Slider( minimum=0, maximum=1000000, value=75000, step=5000, label="Annual Income ($)", info="Total taxable income" ) tax_cost_basis_method = gr.Dropdown( choices=[ ("First In, First Out (FIFO)", "fifo"), ("Last In, First Out (LIFO)", "lifo"), ("Highest In, First Out (HIFO)", "hifo"), ("Average Cost", "average"), ], value="fifo", label="Cost Basis Method", info="Method for calculating gains/losses" ) # Cost basis input section gr.Markdown("### Purchase Information") gr.Markdown( "Enter your purchase details for accurate tax calculations. " "Click 'Load Holdings' to pre-populate with current holdings, then fill in purchase prices and dates." ) with gr.Row(): tax_load_holdings_btn = gr.Button("Load Holdings", size="sm") gr.Markdown( "*Accepted date formats: 9/3/17, Sep 3, 2017, 2017-09-03, etc.*", elem_classes="text-muted" ) tax_cost_basis_input = gr.Dataframe( headers=["Ticker", "Shares", "Purchase Price", "Purchase Date"], datatype=["str", "number", "number", "str"], col_count=(4, "fixed"), row_count=(5, "dynamic"), interactive=True, label="Cost Basis Information", value=[], ) tax_calculate_btn = gr.Button("Calculate Tax Impact", variant="primary") # Bottom row: Analysis output tax_analysis_output = gr.Markdown("") # Tab 4: Stress Testing with gr.Tab("🎲 Stress Testing"): gr.Markdown("Test your portfolio's resilience against historical crises and market scenarios.") with gr.Row(): with gr.Column(scale=1): scenario_choices = [ ("Monte Carlo Simulation (10,000 paths)", "monte_carlo"), ("All Scenarios Comparison", "all_scenarios"), ("2008 Financial Crisis", "2008_financial_crisis"), ("COVID-19 Pandemic (2020)", "covid_2020"), ("Dot-com Bubble (2000-2002)", "dotcom_bubble"), ("European Debt Crisis (2011)", "european_debt_2011"), ("China Devaluation (2015)", "china_2015"), ("Inflation Shock (2022)", "inflation_2022"), ("Severe Recession (Hypothetical)", "severe_recession"), ("Stagflation (Hypothetical)", "stagflation"), ("Flash Crash (Hypothetical)", "flash_crash"), ] stress_scenario_dropdown = gr.Dropdown( choices=scenario_choices, value="monte_carlo", label="Select Stress Scenario", info="Choose a historical crisis or simulation method" ) with gr.Row(): stress_n_sims = gr.Slider( minimum=1000, maximum=50000, value=10000, step=1000, label="Monte Carlo Simulations", info="More simulations = more accuracy (slower)" ) stress_horizon = gr.Slider( minimum=30, maximum=756, value=252, step=30, label="Time Horizon (days)", info="1 year = 252 trading days" ) stress_test_btn = gr.Button("Run Stress Test", variant="primary", size="lg") # Stress test results with gr.Column(): stress_summary = gr.Markdown("", visible=True) with gr.Tabs(): with gr.Tab("Dashboard"): stress_dashboard_plot = gr.Plot(label="Stress Test Dashboard") with gr.Tab("Monte Carlo Paths"): stress_mc_plot = gr.Plot(label="Monte Carlo Simulation Paths") with gr.Tab("Scenario Comparison"): stress_scenario_plot = gr.Plot(label="All Scenarios Comparison") with gr.Tab("Drawdown Analysis"): stress_drawdown_plot = gr.Plot(label="Maximum Drawdown Distribution") # Tab 5: History (embedded) with gr.Tab("📜 History"): gr.Markdown("View your previous portfolio analyses") # Search and filters with gr.Row(): history_search_results = gr.Textbox( placeholder="Search by ticker, date...", label="Search", scale=3, elem_id="history-search-results" ) history_date_filter_results = gr.Dropdown( choices=[ ("All Time", "all"), ("Last 7 Days", "7d"), ("Last 30 Days", "30d"), ("Last 90 Days", "90d"), ], value="all", label="Date Range", scale=1 ) history_refresh_btn_results = gr.Button("🔄 Refresh", size="sm", scale=1) history_table_results = gr.Dataframe( headers=["ID", "Date", "Holdings", "Risk Tolerance", "AI Synthesis Preview"], datatype=["str", "str", "str", "str", "str"], interactive=False, wrap=True, elem_id="history-table-results", column_widths=["5%", "15%", "25%", "15%", "40%"] ) # Pagination controls with gr.Row(): history_prev_btn_results = gr.Button("← Previous", size="sm", scale=1) history_page_info_results = gr.Markdown("Page 1 of 1", elem_id="history-page-info-results") history_next_btn_results = gr.Button("Next →", size="sm", scale=1) with gr.Accordion("Selected Analysis Details", open=False): history_details_output_results = gr.Markdown("") # History Page (Enhancement #4 - Historical Analysis Storage) with gr.Group(visible=False) as history_page: gr.Markdown("### Analysis History") gr.Markdown("View your previous portfolio analyses") with gr.Row(): with gr.Column(scale=4): # Search and filters with gr.Row(): history_search = gr.Textbox( placeholder="Search by ticker, date...", label="Search", scale=3, elem_id="history-search" ) history_date_filter = gr.Dropdown( choices=[ ("All Time", "all"), ("Last 7 Days", "7d"), ("Last 30 Days", "30d"), ("Last 90 Days", "90d"), ], value="all", label="Date Range", scale=1 ) history_refresh_btn = gr.Button("🔄 Refresh", size="sm", scale=1) with gr.Column(scale=1): back_to_input_btn = gr.Button("New Analysis", variant="secondary") history_table = gr.Dataframe( headers=["ID", "Date", "Holdings", "Risk Tolerance", "AI Synthesis Preview"], datatype=["str", "str", "str", "str", "str"], interactive=False, wrap=True, elem_id="history-table", column_widths=["5%", "15%", "25%", "15%", "40%"] ) # Pagination controls with gr.Row(): history_prev_btn = gr.Button("← Previous", size="sm", scale=1) history_page_info = gr.Markdown("Page 1 of 1", elem_id="history-page-info") history_next_btn = gr.Button("Next →", size="sm", scale=1) with gr.Accordion("Selected Analysis Details", open=False) as history_details: history_details_output = gr.Markdown("") view_history_btn = gr.Button("View Analysis History", variant="secondary", visible=False) # Event handlers def show_input_page(): return { task_page: gr.update(visible=False), input_page: gr.update(visible=True), results_page: gr.update(visible=False), history_page: gr.update(visible=False), build_page: gr.update(visible=False), compare_page: gr.update(visible=False), test_page: gr.update(visible=False) } def show_task_page(): return { task_page: gr.update(visible=True), input_page: gr.update(visible=False), results_page: gr.update(visible=False), history_page: gr.update(visible=False), build_page: gr.update(visible=False), compare_page: gr.update(visible=False), test_page: gr.update(visible=False) } def show_build_page(): return { task_page: gr.update(visible=False), input_page: gr.update(visible=False), results_page: gr.update(visible=False), history_page: gr.update(visible=False), build_page: gr.update(visible=True), compare_page: gr.update(visible=False), test_page: gr.update(visible=False) } def show_compare_page(): return { task_page: gr.update(visible=False), input_page: gr.update(visible=False), results_page: gr.update(visible=False), history_page: gr.update(visible=False), build_page: gr.update(visible=False), compare_page: gr.update(visible=True), test_page: gr.update(visible=False) } def show_test_page(): return { task_page: gr.update(visible=False), input_page: gr.update(visible=False), results_page: gr.update(visible=False), history_page: gr.update(visible=False), build_page: gr.update(visible=False), compare_page: gr.update(visible=False), test_page: gr.update(visible=True) } # ============================================================ # AUDIO GENERATION HANDLERS # ============================================================ async def generate_analysis_audio(): """Generate audio narration for portfolio analysis on-demand.""" global LAST_ANALYSIS_STATE if not LAST_ANALYSIS_STATE: logger.warning("No analysis state available for audio generation") return ( gr.update(visible=False), # audio_player gr.update(visible=True), # button ) try: from backend.audio.tts_service import TTSService tts = TTSService() if not tts.is_available(): logger.warning("TTS service not available") return ( gr.update(visible=False), gr.update(visible=True), ) # Extract analysis text ai_synthesis = LAST_ANALYSIS_STATE.get("ai_synthesis", "") recommendations = LAST_ANALYSIS_STATE.get("recommendations", []) if not ai_synthesis and not recommendations: logger.warning("No analysis content available") return ( gr.update(visible=False), gr.update(visible=True), ) logger.info("Generating audio for analysis...") # Generate audio audio_path = await tts.generate_analysis_narration( analysis_text=ai_synthesis[:1000], # Limit to 1000 chars recommendations=recommendations ) logger.info(f"Audio generated: {audio_path}") return ( gr.update(value=audio_path, visible=True), # Show audio player gr.update(visible=True), # Keep button visible ) except Exception as e: logger.error(f"Audio generation failed: {e}") return ( gr.update(visible=False), gr.update(visible=True), ) async def generate_build_audio(): """Generate audio narration for built portfolio on-demand.""" global LAST_BUILD_RESULT if not LAST_BUILD_RESULT: logger.warning("No build result available for audio generation") return ( gr.update(visible=False), gr.update(visible=True), ) try: from backend.audio.tts_service import TTSService tts = TTSService() if not tts.is_available(): logger.warning("TTS service not available") return ( gr.update(visible=False), gr.update(visible=True), ) # Extract portfolio summary portfolio_summary = LAST_BUILD_RESULT.get("summary", "") holdings = LAST_BUILD_RESULT.get("holdings", []) if not portfolio_summary: logger.warning("No portfolio summary available") return ( gr.update(visible=False), gr.update(visible=True), ) logger.info("Generating audio for portfolio...") # Generate audio audio_path = await tts.generate_portfolio_narration( portfolio_summary=portfolio_summary[:1000], holdings=holdings ) logger.info(f"Audio generated: {audio_path}") return ( gr.update(value=audio_path, visible=True), gr.update(visible=True), ) except Exception as e: logger.error(f"Audio generation failed: {e}") return ( gr.update(visible=False), gr.update(visible=True), ) async def generate_debate_audio(): """Generate multi-speaker debate audio on-demand.""" global LAST_DEBATE_DATA logger.info(f"generate_debate_audio called. LAST_DEBATE_DATA exists: {LAST_DEBATE_DATA is not None}") if LAST_DEBATE_DATA: logger.info(f"Debate data keys: {LAST_DEBATE_DATA.keys()}") logger.info(f"Bull case length: {len(LAST_DEBATE_DATA.get('bull_case', ''))}") logger.info(f"Bear case length: {len(LAST_DEBATE_DATA.get('bear_case', ''))}") logger.info(f"Consensus length: {len(LAST_DEBATE_DATA.get('consensus', ''))}") if not LAST_DEBATE_DATA: logger.warning("No debate data available for audio generation") return ( gr.update(visible=False), gr.update(visible=True), ) try: from backend.audio.tts_service import DebateAudioGenerator debate_gen = DebateAudioGenerator() if not debate_gen.is_available(): logger.warning("Debate audio generator not available") return ( gr.update(visible=False), gr.update(visible=True), ) # Extract debate data bull_case = LAST_DEBATE_DATA.get("bull_case", "") bear_case = LAST_DEBATE_DATA.get("bear_case", "") consensus = LAST_DEBATE_DATA.get("consensus", "") bull_confidence = LAST_DEBATE_DATA.get("bull_confidence") bear_confidence = LAST_DEBATE_DATA.get("bear_confidence") stance = LAST_DEBATE_DATA.get("stance") if not bull_case or not bear_case or not consensus: logger.warning("Incomplete debate data") return ( gr.update(visible=False), gr.update(visible=True), ) logger.info("Generating debate audio...") # Generate multi-speaker debate audio audio_path = await debate_gen.generate_debate_audio( bull_case=bull_case[:1000], bear_case=bear_case[:1000], consensus=consensus[:1000], bull_confidence=bull_confidence, bear_confidence=bear_confidence, stance=stance ) logger.info(f"Debate audio generated: {audio_path}") logger.info(f"Audio file exists: {os.path.exists(audio_path) if audio_path else False}") logger.info(f"Returning audio player update with visible=True") return ( gr.update(value=audio_path, visible=True), gr.update(visible=True), ) except Exception as e: logger.error(f"Debate audio generation failed: {e}") return ( gr.update(visible=False), gr.update(visible=True), ) async def handle_build_portfolio(goals, risk_tolerance, constraints, session_state): """Handle the Build Portfolio workflow with streaming updates. Args: goals: List of selected investment goals risk_tolerance: Risk tolerance score 1-10 constraints: User constraints text session_state: User session Yields: Tuple of UI updates: (agent_chat, results_container, status) """ global LAST_BUILD_RESULT logger.info(f"handle_build_portfolio called") logger.info(f"Input types - goals: {type(goals).__name__}, risk_tolerance: {type(risk_tolerance).__name__}, constraints: {type(constraints).__name__}") logger.info(f"Input values - goals: {goals!r}, risk_tolerance: {risk_tolerance!r}, constraints: {constraints!r}") if not goals: logger.warning(f"No goals provided") yield ( gr.update(value=[], visible=False), # build_agent_chat (empty, hidden) gr.update(visible=True), # build_results_container "Please select at least one investment goal.", # build_status gr.update(visible=False) # build_audio_btn (hide on error) ) return try: logger.info(f"Initialising MCP and workflow routers") # Initialise MCP router and workflow router from backend.mcp_router import MCPRouter from backend.agents.workflow_router import WorkflowRouter mcp_router = MCPRouter() workflow_router = WorkflowRouter(mcp_router) # Normalise input types goals_list = goals if isinstance(goals, list) else [goals] if goals else [] constraints_str = ", ".join(constraints) if isinstance(constraints, list) else (constraints or "") logger.info(f"After normalisation - goals_list: {goals_list!r} (type: {type(goals_list).__name__})") logger.info(f"After normalisation - constraints_str: {constraints_str!r} (type: {type(constraints_str).__name__})") # Stream the build workflow chat_messages = [] message_count = 0 logger.info(f"Starting stream from route_build_stream") async for message in workflow_router.route_build_stream( goals=goals_list, risk_tolerance=int(risk_tolerance), constraints=constraints_str ): try: message_count += 1 logger.debug(f"Received message #{message_count} from stream") logger.debug(f"Message type: {type(message).__name__}") logger.debug(f"Message keys: {message.keys() if isinstance(message, dict) else 'N/A'}") logger.debug(f"Message content type: {type(message.get('content')).__name__ if isinstance(message, dict) else 'N/A'}") logger.debug(f"Message: {message!r}") chat_messages.append(message) logger.debug(f"Message appended to chat_messages. Total messages: {len(chat_messages)}") try: # Stream each message to the chatbot in real-time logger.debug(f"About to yield UI update with {len(chat_messages)} messages") yield ( gr.update(value=chat_messages, visible=True), # build_agent_chat (visible, growing list) gr.update(visible=False), # build_results_container (hidden during streaming) "", # build_status gr.update(visible=False) # build_audio_btn (hide during streaming) ) logger.debug(f"UI update yielded successfully") except Exception as e: logger.error(f"Error yielding UI update: {e}") logger.error(f"chat_messages type: {type(chat_messages).__name__}") logger.error(f"chat_messages length: {len(chat_messages)}") logger.error(f"Last message: {chat_messages[-1] if chat_messages else 'N/A'}") raise except Exception as e: logger.error(f"Error processing message #{message_count}") logger.error(f"Message: {message!r}") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error message: {str(e)}") logger.error(f"Full traceback:\n{traceback.format_exc()}") raise # Store build result for audio generation if chat_messages: final_message = chat_messages[-1] if isinstance(final_message, dict) and "metadata" in final_message: portfolio_data = final_message.get("metadata", {}).get("portfolio", {}) # Handle both dict and list formats if isinstance(portfolio_data, list): holdings = portfolio_data elif isinstance(portfolio_data, dict): holdings = portfolio_data.get("holdings", []) else: holdings = [] LAST_BUILD_RESULT = { "summary": final_message.get("content", ""), "holdings": holdings, "reasoning": final_message.get("metadata", {}).get("reasoning_trace", []) } logger.info("Build result stored for audio generation") # Final yield: Show results container logger.info(f"Completed streaming. Yielding final result with {len(chat_messages)} messages") yield ( gr.update(value=chat_messages, visible=True), # build_agent_chat (final state, visible) gr.update(visible=True), # build_results_container (show results) "Portfolio built successfully!", # build_status gr.update(visible=True) # build_audio_btn (show audio button) ) except Exception as e: logger.error(f"=== BUILD PORTFOLIO ERROR ===") logger.error(f"Error type: {type(e).__name__}") logger.error(f"Error message: {str(e)}") # Extract detailed traceback information import traceback import sys tb = sys.exc_info()[2] if tb: tb_list = traceback.extract_tb(tb) for frame_summary in tb_list: logger.error(f" File: {frame_summary.filename}") logger.error(f" Function: {frame_summary.name}") logger.error(f" Line {frame_summary.lineno}: {frame_summary.line}") logger.error(f"Full traceback:\n{traceback.format_exc()}") logger.error(f"message_count at error: {message_count if 'message_count' in locals() else 'N/A'}") logger.error(f"chat_messages at error: {len(chat_messages) if 'chat_messages' in locals() else 'N/A'} messages") logger.error(f"=== END BUILD PORTFOLIO ERROR ===") yield ( gr.update(value=[], visible=False), # build_agent_chat (empty, hidden on error) gr.update(visible=True), # build_results_container f"Error building portfolio: {str(e)}", # build_status gr.update(visible=False) # build_audio_btn (hide on error) ) def handle_build_accept(portfolio_table): """Accept built portfolio and populate input for analysis. Args: portfolio_table: DataFrame with ticker and allocation data Returns: Tuple of values for each output component """ if portfolio_table is None or len(portfolio_table) == 0: return ( "", # portfolio_input gr.update(visible=False), # task_page gr.update(visible=True), # input_page gr.update(visible=False), # results_page gr.update(visible=False), # history_page gr.update(visible=False), # build_page gr.update(visible=False), # compare_page gr.update(visible=False) # test_page ) # Convert portfolio table to input format lines = [] for row in portfolio_table: ticker = row[0] if len(row) > 0 else "" allocation = row[1] if len(row) > 1 else 0 if ticker: lines.append(f"{ticker} {allocation}%") portfolio_text = "\n".join(lines) return ( portfolio_text, # portfolio_input gr.update(visible=False), # task_page gr.update(visible=True), # input_page gr.update(visible=False), # results_page gr.update(visible=False), # history_page gr.update(visible=False), # build_page gr.update(visible=False), # compare_page gr.update(visible=False) # test_page ) async def handle_compare_portfolio(portfolio_text, session_state): """Handle the Compare Strategies workflow with streaming debate. Args: portfolio_text: Raw portfolio text input session_state: User session Yields: Tuple of UI updates: (debate_chat, results_container, status, bull_case, bull_conf, bear_case, bear_conf, consensus, stance, debate_transcript) """ global LAST_DEBATE_DATA if not portfolio_text or not portfolio_text.strip(): yield ( gr.update(value=[], visible=False), # compare_debate_chat (empty, hidden) gr.update(visible=True), # compare_results_container "Please enter your portfolio holdings.", # compare_status "", # compare_bull_case 0, # compare_bull_confidence "", # compare_bear_case 0, # compare_bear_confidence "", # compare_consensus "", # compare_stance [], # compare_debate_transcript gr.update(visible=False) # compare_audio_btn (hide on error) ) return # Parse portfolio holdings = parse_portfolio_input(portfolio_text) if not holdings: yield ( gr.update(value=[], visible=False), # compare_debate_chat (empty, hidden) gr.update(visible=True), # compare_results_container "Could not parse portfolio. Please check format.", # compare_status "", 0, "", 0, "", "", [], gr.update(visible=False) # compare_audio_btn (hide on error) ) return try: # Initialise MCP router and workflow router from backend.mcp_router import MCPRouter from backend.agents.workflow_router import WorkflowRouter mcp_router = MCPRouter() workflow_router = WorkflowRouter(mcp_router) # Stream the compare workflow chat_messages = [] bull_case_data = {} bear_case_data = {} consensus_data = {} debate_transcript = [] async for message in workflow_router.route_compare_stream(holdings=holdings): chat_messages.append(message) # Stream each message to the chatbot in real-time yield ( gr.update(value=chat_messages, visible=True), # compare_debate_chat (visible, growing list) gr.update(visible=False), # compare_results_container (hidden during streaming) "", # compare_status "", # compare_bull_case 0, # compare_bull_confidence "", # compare_bear_case 0, # compare_bear_confidence "", # compare_consensus "", # compare_stance [], # compare_debate_transcript gr.update(visible=False) # compare_audio_btn (hide during streaming) ) # Extract data from final consensus message metadata = message.get("metadata", {}) if metadata and "Consensus" in metadata.get("title", ""): # This is the final consensus - parse it content = message.get("content", "") # Store for final display consensus_data = {"recommendation": content, "stance": "Mixed"} # Extract bull/bear from previous messages for msg in chat_messages: msg_meta = msg.get("metadata", {}) title = msg_meta.get("title", "") if "Bull Researcher" in title: bull_case_data = { "thesis": msg.get("content", ""), "confidence": 65 # Default, could parse from title } elif "Bear Researcher" in title: bear_case_data = { "thesis": msg.get("content", ""), "confidence": 60 # Default, could parse from title } # Store debate data for audio generation if consensus_data and bull_case_data and bear_case_data: LAST_DEBATE_DATA = { "bull_case": bull_case_data.get("thesis", ""), "bear_case": bear_case_data.get("thesis", ""), "consensus": consensus_data.get("recommendation", ""), "bull_confidence": bull_case_data.get("confidence"), "bear_confidence": bear_case_data.get("confidence"), "stance": consensus_data.get("stance", "Mixed") } logger.info("Debate data stored for audio generation") # Final yield: Show results container with analysis yield ( gr.update(value=chat_messages, visible=True), # compare_debate_chat (final state, visible) gr.update(visible=True), # compare_results_container (show results) "Analysis complete!", # compare_status bull_case_data.get("thesis", ""), # compare_bull_case bull_case_data.get("confidence", 0), # compare_bull_confidence bear_case_data.get("thesis", ""), # compare_bear_case bear_case_data.get("confidence", 0), # compare_bear_confidence consensus_data.get("recommendation", ""), # compare_consensus consensus_data.get("stance", "Mixed"), # compare_stance chat_messages, # compare_debate_transcript (use chat messages) gr.update(visible=True) # compare_audio_btn (show audio button) ) except Exception as e: logger.error(f"Compare portfolio error: {e}") yield ( gr.update(value=[], visible=False), # compare_debate_chat (empty, hidden on error) gr.update(visible=True), # compare_results_container f"Error: {str(e)}", # compare_status "", 0, "", 0, "", "", [], gr.update(visible=False) # compare_audio_btn (hide on error) ) async def handle_test_changes(portfolio_text, changes_text, portfolio_value, session_state): """Handle the Test Changes workflow. Args: portfolio_text: Current portfolio as text (ticker weight%) changes_text: Proposed changes as text portfolio_value: Total portfolio value session_state: User session Returns: Tuple of UI updates for test results """ try: if not portfolio_text.strip(): return ( gr.update(visible=True), "Please enter your current portfolio.", [], [], [], [], "", "" ) if not changes_text.strip(): return ( gr.update(visible=True), "Please enter proposed changes.", [], [], [], [], "", "" ) # Parse current portfolio current_portfolio = {} for line in portfolio_text.strip().split("\n"): line = line.strip() if not line: continue parts = line.replace("%", "").split() if len(parts) >= 2: ticker = parts[0].upper() try: weight = float(parts[1]) current_portfolio[ticker] = {"weight": weight} except ValueError: continue # Parse proposed changes proposed_changes = [] for line in changes_text.strip().split("\n"): line = line.strip().lower() if not line: continue parts = line.split() if len(parts) >= 3: action = parts[0] ticker = parts[1].upper() try: amount = float(parts[2]) if action in ["buy", "sell"]: proposed_changes.append({ "action": action, "ticker": ticker, "amount": amount }) except ValueError: continue if not current_portfolio: return ( gr.update(visible=True), "Could not parse portfolio. Use format: TICKER WEIGHT%", [], [], [], [], "", "" ) if not proposed_changes: return ( gr.update(visible=True), "Could not parse changes. Use format: buy/sell TICKER AMOUNT", [], [], [], [], "", "" ) # Initialise MCP router and workflow router from backend.mcp_router import MCPRouter from backend.agents.workflow_router import WorkflowRouter mcp_router = MCPRouter() workflow_router = WorkflowRouter(mcp_router) # Run the test workflow result = await workflow_router.route_test( current_portfolio=current_portfolio, proposed_changes=proposed_changes, portfolio_value=float(portfolio_value) ) # Format current metrics for display current_metrics_data = [] for metric, value in result.get("current", {}).get("metrics", {}).items(): if isinstance(value, (int, float)): current_metrics_data.append([metric.replace("_", " ").title(), f"{value:.4f}"]) # Format simulated metrics for display simulated_metrics_data = [] for metric, value in result.get("simulated", {}).get("metrics", {}).items(): if isinstance(value, (int, float)): simulated_metrics_data.append([metric.replace("_", " ").title(), f"{value:.4f}"]) # Format impact summary impact_data = [] for metric, impact in result.get("impact", {}).items(): impact_data.append([ metric.replace("_", " ").title(), f"{impact.get('current', 0):.4f}", f"{impact.get('simulated', 0):.4f}", f"{impact.get('delta', 0):+.4f}", f"{impact.get('pct_change', 0):+.2f}%" ]) # Format stress test comparison stress_data = [] for scenario, data in result.get("stress_comparison", {}).items(): stress_data.append([ scenario.replace("_", " ").title(), f"{data.get('shock', 0):.0f}%", f"${data.get('current_loss', 0):,.0f}", f"${data.get('simulated_loss', 0):,.0f}", f"${data.get('improvement', 0):+,.0f}" ]) # Format recommendations recommendations_text = "### Recommendations\n\n" for rec in result.get("recommendations", []): recommendations_text += f"- {rec}\n" # Format assessment assessment_text = f"### Overall Assessment\n\n**{result.get('overall_assessment', 'No assessment')}**" return ( gr.update(visible=True), "Simulation complete!", current_metrics_data, simulated_metrics_data, impact_data, stress_data, assessment_text, recommendations_text ) except Exception as e: logger.error(f"Test changes error: {e}") return ( gr.update(visible=True), f"Error running simulation: {str(e)}", [], [], [], [], "", "" ) def sync_handle_test_changes(portfolio_text, changes_text, portfolio_value, session_state): """Synchronous wrapper for handle_test_changes.""" return asyncio.run(handle_test_changes(portfolio_text, changes_text, portfolio_value, session_state)) async def load_history(session_state): """Load analysis history from database. Demo users do not have persistent history - their analyses are ephemeral and only exist for the current session. """ try: is_demo = session_state.get("is_demo", False) if session_state else False # Demo users don't have persistent history if is_demo: logger.info("Demo mode: No persistent history available (analyses are ephemeral)") return [], "Demo mode does not save history. Sign up for a free account to save your analyses!" session = UserSession.from_dict(session_state) # Must be authenticated user with valid user_id if not session or not session.user_id: logger.error("No valid user_id for loading history") return [], "Please sign in to view your analysis history" user_id = session.user_id logger.info(f"Loading history for user: {user_id}") history = await db.get_analysis_history(user_id, limit=20) if not history: logger.info(f"No history found for user {user_id}") return [], "No previous analyses found" logger.info(f"Loaded {len(history)} analyses for user {user_id}") # Format history for dataframe # Store analysis records globally for row selection global HISTORY_RECORDS HISTORY_RECORDS = history rows = [] for i, record in enumerate(history): # Format holdings as comma-separated tickers holdings_str = ", ".join([h.get("ticker", "?") for h in record.get("holdings_snapshot", [])]) # Truncate AI synthesis for preview synthesis_preview = record.get("ai_synthesis", "") rows.append([ str(i), # Row index for selection record.get("created_at", "").split("T")[0], # Date only holdings_str, record.get("portfolios", {}).get("risk_tolerance", "moderate"), synthesis_preview ]) return rows, f"Loaded {len(history)} previous analyses" except Exception as e: logger.error(f"Failed to load history: {e}") return [], f"Error loading history: {str(e)}" def sync_load_history(session_state): """Synchronous wrapper for load_history.""" return asyncio.run(load_history(session_state)) async def filter_history(session_state, search_query: str, date_filter: str, page: int = 1, page_size: int = 10): """Filter and paginate history based on search query and date range. Args: session_state: User session dictionary search_query: Search string to filter by ticker or date date_filter: Date range filter ("all", "7d", "30d", "90d") page: Current page number (1-indexed) page_size: Number of items per page Returns: Tuple of (filtered_rows, page_info_message) """ from datetime import datetime, timedelta try: is_demo = session_state.get("is_demo", False) if session_state else False if is_demo: return [], "Page 1 of 1" session = UserSession.from_dict(session_state) if not session or not session.user_id: return [], "Page 1 of 1" user_id = session.user_id # Load full history (increased limit for filtering) full_history = await db.get_analysis_history(user_id, limit=100) if not full_history: return [], "Page 1 of 1" # Apply date filter filtered_history = [] now = datetime.now() cutoff = None if date_filter == "7d": cutoff = now - timedelta(days=7) elif date_filter == "30d": cutoff = now - timedelta(days=30) elif date_filter == "90d": cutoff = now - timedelta(days=90) for record in full_history: # Apply date filter if cutoff: try: created_at_str = record.get('created_at', '') if created_at_str: # Handle both ISO format with and without timezone if 'T' in created_at_str: created_at_str = created_at_str.split('.')[0] # Remove microseconds created_at_str = created_at_str.replace('Z', '') # Remove timezone analysis_date = datetime.fromisoformat(created_at_str) else: analysis_date = datetime.fromisoformat(created_at_str) if analysis_date < cutoff: continue except Exception as e: logger.warning(f"Date parsing error: {e}") continue # Apply search filter if search_query and search_query.strip(): query_lower = search_query.lower().strip() # Search in tickers holdings = record.get('holdings_snapshot', []) tickers = ' '.join([h.get('ticker', '') for h in holdings]).lower() # Search in date date_str = record.get('created_at', '').lower() # Search in risk tolerance risk = record.get('portfolios', {}).get('risk_tolerance', '').lower() # If query not found in any field, skip this record if query_lower not in tickers and query_lower not in date_str and query_lower not in risk: continue filtered_history.append(record) # Store filtered records globally for row selection global HISTORY_RECORDS HISTORY_RECORDS = filtered_history # Calculate pagination total_items = len(filtered_history) total_pages = max(1, (total_items + page_size - 1) // page_size) page = max(1, min(page, total_pages)) # Clamp page to valid range start_idx = (page - 1) * page_size end_idx = min(start_idx + page_size, total_items) page_items = filtered_history[start_idx:end_idx] # Format for display rows = [] for i, record in enumerate(page_items, start=start_idx): holdings_str = ", ".join([h.get("ticker", "?") for h in record.get("holdings_snapshot", [])]) synthesis_preview = record.get("ai_synthesis", "") rows.append([ str(i), # Global index for selection record.get("created_at", "").split("T")[0], holdings_str, record.get("portfolios", {}).get("risk_tolerance", "moderate"), synthesis_preview ]) page_info = f"Page {page} of {total_pages} ({total_items} total)" return rows, page_info except Exception as e: logger.error(f"Failed to filter history: {e}") return [], "Page 1 of 1" def sync_filter_history(session_state, search_query: str, date_filter: str, page: int = 1, page_size: int = 10): """Synchronous wrapper for filter_history.""" return asyncio.run(filter_history(session_state, search_query, date_filter, page, page_size)) def view_historical_analysis(evt: gr.SelectData): """View full details of a selected historical analysis. Args: evt: Gradio SelectData event containing row index Returns: Markdown string with full analysis details """ global HISTORY_RECORDS try: # Get the row index from the selected row row_index = int(evt.value) # First column contains the index if not HISTORY_RECORDS or row_index >= len(HISTORY_RECORDS): return "**Error**: Could not load analysis. Please refresh the history table." record = HISTORY_RECORDS[row_index] # Format detailed analysis view details = f"""# Analysis Details **Date**: {record.get('created_at', 'Unknown').split('T')[0]} **Portfolio**: {record.get('portfolios', {}).get('name', 'Unnamed Portfolio')} **Risk Tolerance**: {record.get('portfolios', {}).get('risk_tolerance', 'moderate')} --- ## Holdings Snapshot """ # Format holdings as table holdings = record.get('holdings_snapshot', []) if holdings: details += "\n| Ticker | Shares | Value |\n|--------|--------|-------|\n" for holding in holdings: details += f"| {holding.get('ticker', '?')} | {holding.get('shares', 0)} | ${holding.get('value', 0):,.2f} |\n" else: details += "\n*No holdings data available*\n" details += f""" --- ## AI Analysis {record.get('ai_synthesis', '*No analysis available*')} --- ## Recommendations """ recommendations = record.get('recommendations', []) if recommendations: for i, rec in enumerate(recommendations, 1): details += f"\n{i}. {rec}" else: details += "\n*No recommendations available*" details += f""" --- **Model**: {record.get('model_version', 'Unknown')} **Execution Time**: {record.get('execution_time_ms', 0):,.0f}ms """ return details except Exception as e: logger.error(f"Failed to view historical analysis: {e}") return f"**Error**: Failed to load analysis details: {str(e)}" def export_current_analysis_csv(): """Return pre-generated CSV file path. Returns: Path to temporary CSV file, or None if no analysis available """ global LAST_EXPORT_CSV_PATH if not LAST_EXPORT_CSV_PATH: logger.warning("No CSV export available") return None logger.info(f"Returning pre-generated CSV: {LAST_EXPORT_CSV_PATH}") return LAST_EXPORT_CSV_PATH def export_current_analysis_pdf(): """Return pre-generated PDF file path. Returns: Path to temporary PDF file, or None if no analysis available """ global LAST_EXPORT_PDF_PATH if not LAST_EXPORT_PDF_PATH: logger.warning("No PDF export available") return None logger.info(f"Returning pre-generated PDF: {LAST_EXPORT_PDF_PATH}") return LAST_EXPORT_PDF_PATH def validate_email_format(email: str) -> str: """Validate email format in real-time. Args: email: Email address to validate Returns: Validation message """ import re if not email: return "" email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' if re.match(email_regex, email): return "✓ Valid email format" else: return "❌ Invalid email format" def validate_password_match(password: str, confirm: str) -> str: """Check if passwords match in real-time. Args: password: Password field value confirm: Confirmation field value Returns: Validation message """ if not confirm: return "" if password == confirm: return "✓ Passwords match" else: return "❌ Passwords do not match" def check_password_strength(password: str) -> str: """Real-time password strength indicator. Args: password: Password to evaluate Returns: Strength indicator with feedback """ import re if not password: return "" score = 0 feedback = [] # Length check (main NIST requirement) if len(password) >= 15: score += 3 # Length is most important elif len(password) >= 12: score += 2 feedback.append("15+ characters") elif len(password) >= 8: score += 1 feedback.append("15+ characters") else: feedback.append("15+ characters") # Optional complexity (not required by NIST) if re.search(r'[A-Z]', password): score += 1 if re.search(r'[a-z]', password): score += 1 if re.search(r'\d', password): score += 1 if re.search(r'[!@#$%^&*(),.?":{}|<>]', password): score += 1 # Generate feedback message if score <= 2: return "🔴 **Weak** - Add: " + ", ".join(feedback) if feedback else "🔴 **Weak**" elif score <= 4: return "🟠 **Moderate** - " + (("Add: " + ", ".join(feedback)) if feedback else "Consider 15+ characters") elif score <= 6: return "🟡 **Good** - " + (("Add: " + ", ".join(feedback)) if feedback else "Meets requirements") else: return "🟢 **Strong** - Exceeds requirements" def show_history_page(): """Navigate to history page.""" return { task_page: gr.update(visible=False), input_page: gr.update(visible=False), results_page: gr.update(visible=False), history_page: gr.update(visible=True) } def load_tax_holdings(): """Load current portfolio holdings into the cost basis dataframe. Returns: List of lists with [ticker, shares, "", ""] for each holding. """ global LAST_ANALYSIS_STATE if not LAST_ANALYSIS_STATE or "holdings" not in LAST_ANALYSIS_STATE: return [] holdings = LAST_ANALYSIS_STATE.get("holdings", []) dataframe_data = [] for holding in holdings: ticker = holding.get("ticker", "") quantity = holding.get("quantity", 0) if ticker and quantity > 0: dataframe_data.append([ticker, quantity, None, ""]) return dataframe_data def calculate_tax_impact( filing_status: str, annual_income: float, cost_basis_method: str, cost_basis_data: list ): """Calculate tax impact for current portfolio holdings. Args: filing_status: Tax filing status annual_income: Annual taxable income cost_basis_method: Cost basis calculation method cost_basis_data: Dataframe rows with ticker, shares, purchase_price, purchase_date Returns: Formatted markdown tax analysis report """ global LAST_ANALYSIS_STATE if not LAST_ANALYSIS_STATE or "holdings" not in LAST_ANALYSIS_STATE: return """# Tax Impact Analysis **Error**: Please run a portfolio analysis first before calculating tax impact. Use the "Analyse Portfolio" button to analyse your portfolio, then return here to view tax implications. """ try: holdings = LAST_ANALYSIS_STATE.get("holdings", []) # Parse cost basis dataframe into structured format user_cost_basis = [] # Handle pandas DataFrame from Gradio import pandas as pd if isinstance(cost_basis_data, pd.DataFrame): logger.info(f"Received DataFrame with {len(cost_basis_data)} rows") if not cost_basis_data.empty: # Convert DataFrame to list of lists cost_basis_rows = cost_basis_data.values.tolist() logger.info(f"DataFrame data: {cost_basis_rows}") else: cost_basis_rows = [] logger.info("DataFrame is empty") elif cost_basis_data is not None: # Already a list cost_basis_rows = cost_basis_data logger.info(f"Received list with {len(cost_basis_rows)} rows") else: cost_basis_rows = [] logger.info("No cost basis data provided") for row in cost_basis_rows: if len(row) >= 4: # Handle pandas nan values ticker_val = row[0] if pd.isna(ticker_val) or not str(ticker_val).strip(): continue ticker = str(ticker_val).strip().upper() shares = None if pd.isna(row[1]) else row[1] purchase_price = None if pd.isna(row[2]) else row[2] purchase_date = "" if pd.isna(row[3]) else str(row[3]).strip() # Only include rows with meaningful data if ticker and purchase_price is not None and purchase_date: user_cost_basis.append({ "ticker": ticker, "shares": shares, "purchase_price": purchase_price, "purchase_date": purchase_date, }) logger.info(f"Parsed {len(user_cost_basis)} valid cost basis entries: {user_cost_basis}") report = create_tax_analysis( holdings=holdings, filing_status=filing_status, annual_income=annual_income, cost_basis_method=cost_basis_method, user_cost_basis=user_cost_basis if user_cost_basis else None, ) return format_tax_analysis_output(report) except Exception as e: logger.error(f"Tax calculation error: {e}", exc_info=True) return f"""# Tax Impact Analysis **Error**: {str(e)} Please try again with different parameters. """ async def save_portfolio_input_text(user_id: str, portfolio_text: str): """Save portfolio input text for quick reload. Automatically keeps only last 3 entries per user via database trigger. """ try: await db.save_portfolio_input(user_id, portfolio_text) except Exception as e: logger.error(f"Failed to save portfolio input: {e}") async def get_past_portfolios_for_dropdown(user_id: str): """Get last 3 portfolio inputs formatted for dropdown.""" try: portfolios = await db.get_portfolio_inputs(user_id, limit=3) # Format for dropdown: (display_text, index) choices = [] for i, portfolio in enumerate(portfolios): # Generate preview from first few lines of description lines = [line.strip() for line in portfolio['description'].split('\n') if line.strip()][:3] tickers = [] for line in lines: parts = line.split() if parts: tickers.append(parts[0]) preview = ', '.join(tickers) total_lines = len([l for l in portfolio['description'].split('\n') if l.strip()]) if total_lines > 3: preview += f' +{total_lines - 3} more' choices.append((preview, i)) return choices except Exception as e: logger.error(f"Failed to load past portfolios: {e}") return [] async def load_portfolio_text(user_id: str, index: int): """Load portfolio text by index (0 = most recent).""" try: portfolios = await db.get_portfolio_inputs(user_id, limit=3) if 0 <= index < len(portfolios): return portfolios[index].get('description', '') return "" except Exception as e: logger.error(f"Failed to load portfolio text: {e}") return "" async def handle_analysis(session_state, portfolio_text, roast_mode, persona, request: gr.Request, progress=gr.Progress()): # Check authentication if not check_authentication(session_state): yield { input_page: gr.update(visible=False), loading_page: gr.update(visible=False), results_page: gr.update(visible=False), loading_message: "❌ Please sign in to analyse your portfolio", analysis_output: "", performance_metrics_output: "", allocation_plot: None, risk_plot: None, performance_plot: None, correlation_plot: None, optimization_plot: None, load_past_portfolio_dropdown: gr.skip(), export_pdf_btn: gr.skip(), export_csv_btn: gr.skip() } return # Enforce rate limiting if rate_limit_middleware: try: rate_limit_middleware.enforce(request, session_state=session_state) except Exception as e: logger.warning(f"Rate limit exceeded: {e}") yield { input_page: gr.update(visible=False), loading_page: gr.update(visible=False), results_page: gr.update(visible=False), loading_message: f"❌ {str(e)}", analysis_output: "", performance_metrics_output: "", allocation_plot: None, risk_plot: None, performance_plot: None, correlation_plot: None, optimization_plot: None, load_past_portfolio_dropdown: gr.skip(), export_pdf_btn: gr.skip(), export_csv_btn: gr.skip() } return # Auto-save portfolio input for quick reload (keep last 3) session = UserSession.from_dict(session_state) dropdown_choices = [] if session and session.user_id and portfolio_text.strip(): await save_portfolio_input_text(session.user_id, portfolio_text) dropdown_choices = await get_past_portfolios_for_dropdown(session.user_id) # Show loading page immediately yield { input_page: gr.update(visible=False), loading_page: gr.update(visible=True), results_page: gr.update(visible=False), loading_message: random.choice(LOADING_MESSAGES), analysis_output: "", performance_metrics_output: "", allocation_plot: None, risk_plot: None, performance_plot: None, correlation_plot: None, optimization_plot: None, load_past_portfolio_dropdown: gr.update(choices=dropdown_choices) if dropdown_choices else gr.skip(), export_pdf_btn: gr.skip(), export_csv_btn: gr.skip() } # Run analysis with progress updates page, analysis, perf_metrics, alloc, risk, perf, corr, opt, audio_btn = await run_analysis_with_ui_update( session_state, portfolio_text, roast_mode, persona, progress ) # Show results or return to input if page == "results": global LAST_EXPORT_PDF_PATH, LAST_EXPORT_CSV_PATH yield { input_page: gr.update(visible=False), loading_page: gr.update(visible=False), results_page: gr.update(visible=True), loading_message: "Analysis complete!", analysis_output: analysis, performance_metrics_output: perf_metrics, allocation_plot: alloc, risk_plot: risk, performance_plot: perf, correlation_plot: corr, optimization_plot: opt, analysis_audio_btn: audio_btn, load_past_portfolio_dropdown: gr.update(choices=dropdown_choices) if dropdown_choices else gr.skip(), export_pdf_btn: LAST_EXPORT_PDF_PATH, export_csv_btn: LAST_EXPORT_CSV_PATH } else: yield { input_page: gr.update(visible=True), loading_page: gr.update(visible=False), results_page: gr.update(visible=False), loading_message: "", analysis_output: analysis, performance_metrics_output: "", allocation_plot: None, risk_plot: None, performance_plot: None, correlation_plot: None, optimization_plot: None, analysis_audio_btn: audio_btn, load_past_portfolio_dropdown: gr.skip(), export_pdf_btn: gr.skip(), export_csv_btn: gr.skip() } # Wire up live preview on input change portfolio_input.change( update_live_preview, inputs=[portfolio_input], outputs=[preview_output] ) # Wire up refresh prices button refresh_prices_btn.click( fetch_and_update_preview, inputs=[portfolio_input], outputs=[preview_output] ) # Wire up past portfolio dropdown to load selected portfolio async def handle_load_past_portfolio(index, session): """Load selected past portfolio into input field.""" if index is None: return gr.skip() session_obj = UserSession.from_dict(session) if not session_obj or not session_obj.user_id: return "" portfolio_text = await load_portfolio_text(session_obj.user_id, index) return portfolio_text load_past_portfolio_dropdown.change( handle_load_past_portfolio, inputs=[load_past_portfolio_dropdown, session_state], outputs=[portfolio_input] ) analyse_btn.click( handle_analysis, inputs=[session_state, portfolio_input, roast_mode_toggle, persona_dropdown], outputs=[ input_page, loading_page, results_page, loading_message, analysis_output, performance_metrics_output, allocation_plot, risk_plot, performance_plot, correlation_plot, optimization_plot, load_past_portfolio_dropdown, export_pdf_btn, export_csv_btn, analysis_audio_btn # Audio button ], show_progress="full" ) # Export buttons - download directly (DownloadButton triggers browser download automatically) export_pdf_btn.click( export_current_analysis_pdf, outputs=export_pdf_btn ) export_csv_btn.click( export_current_analysis_csv, outputs=export_csv_btn ) # Back button from input page to task page input_back_btn.click( show_task_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) # Auto-load history when History tab is selected def load_history_if_selected(evt: gr.SelectData, session): """Load history only when History tab is selected.""" if evt.index == 4: # History tab is index 4 (0-indexed: Analysis, Dashboard, Tax, Stress, History) return sync_load_history(session) return gr.skip(), gr.skip() results_tabs.select( load_history_if_selected, inputs=[session_state], outputs=[history_table_results, history_details_output_results] ) back_to_input_btn.click( show_input_page, outputs=[task_page, input_page, results_page, history_page] ) # History table selection handlers history_table.select( view_historical_analysis, outputs=[history_details_output] ) history_table_results.select( view_historical_analysis, outputs=[history_details_output_results] ) # History search and filter handlers (standalone history page) history_search.change( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search, history_date_filter, history_current_page], outputs=[history_table, history_page_info], show_api=False ).then( lambda: 1, # Reset to page 1 on search outputs=[history_current_page], show_api=False ) history_date_filter.change( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search, history_date_filter, history_current_page], outputs=[history_table, history_page_info], show_api=False ).then( lambda: 1, # Reset to page 1 on filter change outputs=[history_current_page], show_api=False ) history_refresh_btn.click( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search, history_date_filter, history_current_page], outputs=[history_table, history_page_info], show_api=False ) # Pagination handlers (standalone) history_prev_btn.click( lambda page: max(1, page - 1), inputs=[history_current_page], outputs=[history_current_page], show_api=False ).then( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search, history_date_filter, history_current_page], outputs=[history_table, history_page_info], show_api=False ) history_next_btn.click( lambda page: page + 1, inputs=[history_current_page], outputs=[history_current_page], show_api=False ).then( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search, history_date_filter, history_current_page], outputs=[history_table, history_page_info], show_api=False ) # History search and filter handlers (results page tab) history_search_results.change( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], outputs=[history_table_results, history_page_info_results], show_api=False ).then( lambda: 1, # Reset to page 1 on search outputs=[history_current_page_results], show_api=False ) history_date_filter_results.change( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], outputs=[history_table_results, history_page_info_results], show_api=False ).then( lambda: 1, # Reset to page 1 on filter change outputs=[history_current_page_results], show_api=False ) history_refresh_btn_results.click( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], outputs=[history_table_results, history_page_info_results], show_api=False ) # Pagination handlers (results page) history_prev_btn_results.click( lambda page: max(1, page - 1), inputs=[history_current_page_results], outputs=[history_current_page_results], show_api=False ).then( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], outputs=[history_table_results, history_page_info_results], show_api=False ) history_next_btn_results.click( lambda page: page + 1, inputs=[history_current_page_results], outputs=[history_current_page_results], show_api=False ).then( lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], outputs=[history_table_results, history_page_info_results], show_api=False ) # Tax load holdings button tax_load_holdings_btn.click( load_tax_holdings, inputs=[], outputs=[tax_cost_basis_input] ) # Tax calculator button (Enhancement #5) tax_calculate_btn.click( calculate_tax_impact, inputs=[tax_filing_status, tax_annual_income, tax_cost_basis_method, tax_cost_basis_input], outputs=[tax_analysis_output] ) # Wire up stress test button stress_test_btn.click( run_stress_test, inputs=[stress_scenario_dropdown, stress_n_sims, stress_horizon], outputs=[ stress_summary, stress_dashboard_plot, stress_mc_plot, stress_scenario_plot, stress_drawdown_plot ] ) # Authentication event handlers async def handle_login(email: str, password: str, current_session: Dict): """Handle user login.""" if not email or not password: return current_session, "Please enter email and password", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() success, message, session = await auth.login(email, password) if success and session: # Update session state session_dict = session.to_dict() # Get past portfolio dropdown choices dropdown_choices = await get_past_portfolios_for_dropdown(session.user_id) return ( session_dict, f"✅ {message}", gr.update(visible=False), # Hide login form gr.update(visible=True), # Show task selection gr.update(value=f"👤 {session.username} | {session.email}"), # Update user info gr.update(choices=dropdown_choices), # Update past portfolio dropdown gr.update(visible=True), # Show logout button gr.update(visible=True) # Show sidebar ) else: # On failure, don't show logout_btn or sidebar return current_session, f"❌ {message}", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() async def handle_signup(email: str, password: str, confirm_password: str, username: str, current_session: Dict): """Handle user signup.""" # Validation if not email or not password: return current_session, "❌ Please enter email and password" if password != confirm_password: return current_session, "❌ Passwords do not match" # NIST SP 800-63B-4: Minimum 15 characters for single-factor auth # No composition rules (uppercase, numbers, symbols) required if len(password) < 15: return current_session, "❌ Password must be at least 15 characters (NIST 2024 requirement)" success, message, session = await auth.signup(email, password, username) if success: if session: # Auto-login after signup (if email confirmation disabled) return session.to_dict(), f"✅ {message}" else: return current_session, f"✅ {message}" else: return current_session, f"❌ {message}" async def handle_logout(current_session: Dict): """Handle user logout and reset all UI components. Security: Clears all global state and user-specific cache to prevent data leakage between user sessions. """ global LAST_ANALYSIS_STATE, HISTORY_RECORDS, LAST_STRESS_TEST session = UserSession.from_dict(current_session) # Extract user ID before clearing session (for cache cleanup) user_id = session.user_id if session else None # 1. Clear backend authentication success, message = await auth.logout(session) # 2. Clear global state variables (CRITICAL for data security) LAST_ANALYSIS_STATE = None HISTORY_RECORDS = [] LAST_STRESS_TEST = None # 3. Clear user-specific cache entries if user_id: try: from backend.caching.factory import cache_manager # Clear user-specific data from cache cache_manager.invalidation.invalidate_by_event("user_update", user_id) logger.info(f"Cleared cache for user {user_id}") except Exception as e: logger.error(f"Failed to clear user cache on logout: {e}") # 4. Clear session and reset ALL UI components to initial state return ( {}, # Clear session state f"✅ {message}", # Login message gr.update(visible=True), # Show login container gr.update(visible=False), # Hide task page gr.update(visible=False), # Hide input page gr.update(value="Not logged in"), # Clear user info gr.update(visible=False), # Hide logout button gr.update(visible=False), # Hide sidebar gr.update(visible=False), # Hide results page gr.update(visible=False), # Hide loading page gr.update(visible=False), # Hide history page # Clear all analysis outputs gr.update(value=""), # analysis_output gr.update(value=None), # allocation_plot gr.update(value=None), # risk_plot gr.update(value=None), # performance_plot gr.update(value=None), # correlation_plot gr.update(value=None), # optimization_plot # Clear history gr.update(value=[]), # history_table # Clear password reset modals gr.update(visible=False), # password_reset_modal gr.update(visible=False), # password_update_modal # Clear user input data (SECURITY: prevent data leakage) gr.update(value=""), # portfolio_input gr.update(choices=[], value=None), # load_past_portfolio_dropdown # Clear build page gr.update(visible=False), # Hide build_page gr.update(visible=False), # Hide build_results_container gr.update(value=[]), # Clear build_agent_chat gr.update(value=[]), # Clear build_goals gr.update(value=5), # Reset build_risk_tolerance to middle value gr.update(value=""), # Clear build_constraints gr.update(value=""), # Clear build_status # Clear compare page gr.update(visible=False), # Hide compare_page gr.update(visible=False), # Hide compare_results_container gr.update(value=[]), # Clear compare_debate_chat gr.update(value=""), # Clear compare_portfolio_input gr.update(value=""), # Clear compare_status gr.update(value=""), # Clear compare_bull_case gr.update(value=0), # Reset compare_bull_confidence gr.update(value=""), # Clear compare_bear_case gr.update(value=0), # Reset compare_bear_confidence gr.update(value=""), # Clear compare_consensus gr.update(value=""), # Clear compare_stance gr.update(value=[]), # Clear compare_debate_transcript # Clear test page gr.update(visible=False), # Hide test_page gr.update(visible=False), # Hide test_results_container gr.update(value=""), # Clear test_portfolio_input gr.update(value=""), # Clear test_changes_input gr.update(value=100000), # Reset test_portfolio_value to default gr.update(value=""), # Clear test_status gr.update(value=[]), # Clear test_current_metrics gr.update(value=[]), # Clear test_simulated_metrics gr.update(value=[]), # Clear test_impact_summary gr.update(value=[]), # Clear test_stress_comparison gr.update(value=""), # Clear test_assessment gr.update(value=""), # Clear test_recommendations ) def sync_login(email: str, password: str, current_session: Dict): """Synchronous wrapper for login.""" return asyncio.run(handle_login(email, password, current_session)) def sync_signup(email: str, password: str, confirm_password: str, username: str, current_session: Dict): """Synchronous wrapper for signup.""" return asyncio.run(handle_signup(email, password, confirm_password, username, current_session)) def sync_logout(current_session: Dict): """Synchronous wrapper for logout.""" return asyncio.run(handle_logout(current_session)) async def handle_password_reset(email: str): """Handle password reset request.""" if not email: return "", "❌ Please enter your email address" success, message = await auth.request_password_reset(email) # Simple success message if success: return "", f"{message}\n\nCheck your email and click the reset link to set your new password." return "", message def sync_password_reset(email: str): """Synchronous wrapper for password reset.""" return asyncio.run(handle_password_reset(email)) async def handle_password_update(password: str, confirm: str, email: str, token_hash: str): """Handle password update after reset with recovery token_hash.""" if not email: return "❌ Please enter your email address" if not password or not confirm: return "❌ Please enter both password fields" if password != confirm: return "❌ Passwords do not match" # NIST SP 800-63B-4: Minimum 15 characters for single-factor auth if len(password) < 15: return "❌ Password must be at least 15 characters (NIST 2024 requirement)" success, message = await auth.update_password(password, email, token_hash) return message def sync_password_update(password: str, confirm: str, email: str, token_hash: str): """Synchronous wrapper for password update.""" return asyncio.run(handle_password_update(password, confirm, email, token_hash)) # Connect authentication handlers login_btn.click( sync_login, inputs=[login_email, login_password, session_state], outputs=[session_state, login_message, login_container, task_page, user_info, load_past_portfolio_dropdown, logout_btn, sidebar] ) signup_btn.click( sync_signup, inputs=[signup_email, signup_password, signup_confirm, signup_username, session_state], outputs=[session_state, signup_message] ) # Real-time email validation signup_email.change( validate_email_format, inputs=[signup_email], outputs=[signup_email_validation] ) # Real-time password strength meter signup_password.change( check_password_strength, inputs=[signup_password], outputs=[signup_password_strength] ) # Real-time password match validation signup_confirm.change( validate_password_match, inputs=[signup_password, signup_confirm], outputs=[signup_password_match] ) # Password reset handlers forgot_password_btn.click( lambda: (gr.update(visible=False), gr.update(visible=True)), outputs=[login_container, password_reset_modal], show_api=False ) reset_cancel_btn.click( lambda: (gr.update(visible=True), gr.update(visible=False), ""), outputs=[login_container, password_reset_modal, reset_message], show_api=False ) update_cancel_btn.click( lambda: (gr.update(visible=True), gr.update(visible=False), ""), outputs=[login_container, password_update_modal, update_password_message], show_api=False ) reset_submit_btn.click( sync_password_reset, inputs=[reset_email], outputs=[reset_email, reset_message] ) # Show password update modal when recovery token_hash is detected recovery_token_hash.change( fn=lambda token: ( gr.update(visible=False) if token else gr.update(visible=True), gr.update(visible=True) if token else gr.update(visible=False) ), inputs=[recovery_token_hash], outputs=[login_container, password_update_modal], show_api=False ) # Password update handler (separate modal after email link click) update_password_btn.click( sync_password_update, inputs=[new_password, confirm_new_password, recovery_email, recovery_token_hash], outputs=[update_password_message] ).then( # On success, hide modal and show login form lambda msg: ( gr.update(visible=False) if "✅" in msg else gr.update(visible=True), gr.update(visible=True) if "✅" in msg else gr.update(visible=False) ), inputs=[update_password_message], outputs=[password_update_modal, login_container], show_api=False ) def handle_demo_mode(current_session: Dict): """Handle demo mode activation (anonymous with rate limiting).""" # Create a demo session (not authenticated) demo_session = { "authenticated": False, "is_demo": True, "user_id": None, # No user ID for anonymous } return ( demo_session, # session_state "", # demo_message (cleared) gr.update(visible=False), # Hide login container gr.update(visible=True), # Show task selection "Demo Mode - 1 free analysis per day", # user_info gr.update(visible=False), # Hide logout button for demo gr.update(visible=True) # Show sidebar ) demo_btn.click( handle_demo_mode, inputs=[session_state], outputs=[session_state, demo_message, login_container, task_page, user_info, logout_btn, sidebar] ) logout_btn.click( sync_logout, inputs=[session_state], outputs=[ session_state, login_message, login_container, task_page, input_page, user_info, logout_btn, sidebar, results_page, loading_page, history_page, analysis_output, allocation_plot, risk_plot, performance_plot, correlation_plot, optimization_plot, history_table, password_reset_modal, password_update_modal, portfolio_input, load_past_portfolio_dropdown, # Build page components build_page, build_results_container, build_agent_chat, build_goals, build_risk_tolerance, build_constraints, build_status, # Compare page components compare_page, compare_results_container, compare_debate_chat, compare_portfolio_input, compare_status, compare_bull_case, compare_bull_confidence, compare_bear_case, compare_bear_confidence, compare_consensus, compare_stance, compare_debate_transcript, # Test page components test_page, test_results_container, test_portfolio_input, test_changes_input, test_portfolio_value, test_status, test_current_metrics, test_simulated_metrics, test_impact_summary, test_stress_comparison, test_assessment, test_recommendations ] ) # Navigation event handlers nav_new_analysis.click( show_task_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) # Task card event handlers task_analyse_btn.click( show_input_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) task_build_btn.click( show_build_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) task_compare_btn.click( show_compare_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) task_test_btn.click( show_test_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) # Build page event handlers build_back_btn.click( show_task_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) build_submit_btn.click( handle_build_portfolio, inputs=[build_goals, build_risk_tolerance, build_constraints, session_state], outputs=[ build_agent_chat, build_results_container, build_status, build_audio_btn # Audio button ] ) build_regenerate_btn.click( handle_build_portfolio, inputs=[build_goals, build_risk_tolerance, build_constraints, session_state], outputs=[ build_agent_chat, build_results_container, build_status, build_audio_btn ] ) # Compare page event handlers compare_back_btn.click( show_task_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) compare_submit_btn.click( handle_compare_portfolio, inputs=[compare_portfolio_input, session_state], outputs=[ compare_debate_chat, compare_results_container, compare_status, compare_bull_case, compare_bull_confidence, compare_bear_case, compare_bear_confidence, compare_consensus, compare_stance, compare_debate_transcript, compare_audio_btn # Audio button ] ) # Test page event handlers test_back_btn.click( show_task_page, outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] ) test_submit_btn.click( sync_handle_test_changes, inputs=[test_portfolio_input, test_changes_input, test_portfolio_value, session_state], outputs=[ test_results_container, test_status, test_current_metrics, test_simulated_metrics, test_impact_summary, test_stress_comparison, test_assessment, test_recommendations ] ) nav_view_history.click( show_history_page, outputs=[task_page, input_page, results_page, history_page] ).then( sync_load_history, inputs=[session_state], outputs=[history_table, history_details_output] ) # Sidebar sign out button (mirrors main logout button) nav_signout_btn.click( sync_logout, inputs=[session_state], outputs=[ session_state, login_message, login_container, task_page, input_page, user_info, logout_btn, sidebar, results_page, loading_page, history_page, analysis_output, allocation_plot, risk_plot, performance_plot, correlation_plot, optimization_plot, history_table, password_reset_modal, password_update_modal, portfolio_input, load_past_portfolio_dropdown, # Build page components build_page, build_results_container, build_agent_chat, build_goals, build_risk_tolerance, build_constraints, build_status, # Compare page components compare_page, compare_results_container, compare_debate_chat, compare_portfolio_input, compare_status, compare_bull_case, compare_bull_confidence, compare_bear_case, compare_bear_confidence, compare_consensus, compare_stance, compare_debate_transcript, # Test page components test_page, test_results_container, test_portfolio_input, test_changes_input, test_portfolio_value, test_status, test_current_metrics, test_simulated_metrics, test_impact_summary, test_stress_comparison, test_assessment, test_recommendations ] ) # ============================================================ # AUDIO BUTTON EVENT HANDLERS # ============================================================ # Use event chaining to show loading state during audio generation: # 1. Disable button and change text to "Generating..." # 2. Run async audio generation with progress indicator # 3. Restore button to original state # Analysis audio button analysis_audio_btn.click( fn=lambda: gr.update(value="Generating...", interactive=False), outputs=analysis_audio_btn ).then( fn=generate_analysis_audio, inputs=[], outputs=[analysis_audio_player, analysis_audio_btn], show_progress="minimal" ).then( fn=lambda: gr.update(value="🔊 Listen to Analysis", interactive=True), outputs=analysis_audio_btn ) # Build portfolio audio button build_audio_btn.click( fn=lambda: gr.update(value="Generating...", interactive=False), outputs=build_audio_btn ).then( fn=generate_build_audio, inputs=[], outputs=[build_audio_player, build_audio_btn], show_progress="minimal" ).then( fn=lambda: gr.update(value="🔊 Listen to Portfolio", interactive=True), outputs=build_audio_btn ) # Compare/debate audio button compare_audio_btn.click( fn=lambda: gr.update(value="Generating...", interactive=False), outputs=compare_audio_btn ).then( fn=generate_debate_audio, inputs=[], outputs=[compare_audio_player, compare_audio_btn], show_progress="minimal" ).then( fn=lambda: gr.update(value="🎭 Listen to Debate", interactive=True), outputs=compare_audio_btn ) # MCP Tool Registrations (API/MCP only - no UI components) # Market Data Tools gr.api(mcp_tools.market_get_quote, api_name="market_get_quote") gr.api(mcp_tools.market_get_historical_data, api_name="market_get_historical_data") gr.api(mcp_tools.market_get_fundamentals, api_name="market_get_fundamentals") gr.api(mcp_tools.market_get_company_profile, api_name="market_get_company_profile") gr.api(mcp_tools.market_get_income_statement, api_name="market_get_income_statement") gr.api(mcp_tools.market_get_balance_sheet, api_name="market_get_balance_sheet") gr.api(mcp_tools.market_get_cash_flow_statement, api_name="market_get_cash_flow_statement") gr.api(mcp_tools.market_get_financial_ratios, api_name="market_get_financial_ratios") gr.api(mcp_tools.market_get_key_metrics, api_name="market_get_key_metrics") gr.api(mcp_tools.market_get_economic_series, api_name="market_get_economic_series") # Technical Analysis Tools gr.api(mcp_tools.technical_get_indicators, api_name="technical_get_indicators") gr.api(mcp_tools.technical_extract_features, api_name="technical_extract_features") gr.api(mcp_tools.technical_normalise_features, api_name="technical_normalise_features") gr.api(mcp_tools.technical_select_features, api_name="technical_select_features") gr.api(mcp_tools.technical_compute_feature_vector, api_name="technical_compute_feature_vector") # Portfolio Optimisation Tools gr.api(mcp_tools.portfolio_optimize_hrp, api_name="portfolio_optimize_hrp") gr.api(mcp_tools.portfolio_optimize_black_litterman, api_name="portfolio_optimize_black_litterman") gr.api(mcp_tools.portfolio_optimize_mean_variance, api_name="portfolio_optimize_mean_variance") # Risk Analysis Tools gr.api(mcp_tools.risk_analyze, api_name="risk_analyze") gr.api(mcp_tools.risk_forecast_volatility_garch, api_name="risk_forecast_volatility_garch") # ML Forecasting Tools gr.api(mcp_tools.ml_forecast_ensemble, api_name="ml_forecast_ensemble") # Sentiment Analysis Tools gr.api(mcp_tools.sentiment_get_news, api_name="sentiment_get_news") return demo if __name__ == "__main__": demo = create_interface() # Suppress console output on HF Spaces for cleaner logs is_hf_space = os.environ.get("SPACE_ID") is not None demo.queue().launch( server_name="0.0.0.0", server_port=7860, share=False, quiet=is_hf_space, # Suppress verbose output on HF Spaces mcp_server=True, # Native MCP server at /gradio_api/mcp/ allowed_paths=["/tmp"] # Allow serving export files from temp directory )