Spaces:
Running
on
Zero
Running
on
Zero
| """Portfolio Intelligence Platform - Gradio Interface. | |
| Main application file for the hackathon demo. | |
| Features: | |
| - Single-page workflow (input β loading β results) | |
| - Unified dashboard with responsive bento grid layout | |
| - Auto-refreshing visualisations | |
| - Full-screen responsive design | |
| - Agent reasoning transparency | |
| """ | |
| import os | |
| import warnings | |
| from dotenv import load_dotenv | |
| # Load environment variables first (before any other imports) | |
| load_dotenv() | |
| # Configure logging early (before other imports create loggers) | |
| # Controlled via environment variables: | |
| # LOG_LEVEL: DEBUG, INFO, WARNING, ERROR, CRITICAL, OFF | |
| # LOGGING_ENABLED: true/false | |
| from backend.logging_config import configure_logging | |
| configure_logging() | |
| # Disable telemetry on HuggingFace Spaces | |
| if os.environ.get("SPACE_ID"): | |
| os.environ["GRADIO_ANALYTICS_ENABLED"] = "False" | |
| os.environ["HF_HUB_DISABLE_TELEMETRY"] = "true" | |
| # Now import remaining modules | |
| import gradio as gr | |
| import asyncio | |
| import re | |
| import logging | |
| import time | |
| import random | |
| import numpy as np | |
| import traceback | |
| import sys | |
| from typing import List, Dict, Any, Tuple, Optional | |
| from datetime import datetime | |
| # Initialise Sentry monitoring early (before other imports that might raise errors) | |
| from backend.monitoring import initialise_sentry | |
| initialise_sentry() | |
| from backend.mcp_router import mcp_router | |
| from backend import mcp_tools | |
| from backend.agents.workflow import PortfolioAnalysisWorkflow | |
| from backend.models.agent_state import AgentState | |
| from backend.database import db | |
| from backend.theme import get_financial_theme, FINANCIAL_CSS | |
| from backend.visualizations import ( | |
| create_portfolio_allocation_chart, | |
| create_risk_metrics_dashboard, | |
| create_performance_chart, | |
| create_correlation_heatmap, | |
| create_optimization_comparison, | |
| ) | |
| from backend.stress_testing import ( | |
| PortfolioStressTest, | |
| STRESS_SCENARIOS, | |
| create_monte_carlo_paths_plot, | |
| create_scenario_comparison_chart, | |
| create_drawdown_analysis_chart, | |
| create_stress_test_dashboard, | |
| ) | |
| from backend.agents.personas import get_available_personas | |
| from backend.tax.interface import create_tax_analysis, format_tax_analysis_output | |
| from backend.config import settings | |
| from backend.rate_limiting import ( | |
| GradioRateLimitMiddleware, | |
| UserTier, | |
| ) | |
| from backend.rate_limiting.fixed_window import TieredFixedWindowLimiter | |
| from backend.auth import auth, UserSession | |
| from backend.export import export_analysis_to_csv, export_analysis_to_pdf | |
| # Import ensemble predictor at startup so @spaces.GPU decorator is detected by ZeroGPU | |
| # This module contains GPU-accelerated ML forecasting (Chronos, TTM, N-HiTS) | |
| import backend.mcp_servers.ensemble_predictor_mcp # noqa: F401 | |
| def check_authentication(session_state: Dict) -> bool: | |
| """Check if user is authenticated or in demo mode.""" | |
| if not session_state: | |
| return False | |
| # Allow demo mode (rate-limited anonymous access) | |
| if session_state.get("is_demo", False): | |
| return True | |
| # Check regular authentication | |
| session = UserSession.from_dict(session_state) | |
| return session is not None and session.user_id is not None | |
| # Suppress websockets deprecation warnings | |
| # Note: websockets 15.0+ deprecated WebSocketServerProtocol used by uvicorn 0.38.0 | |
| # This is a known issue with no functionality impact. The legacy API is supported until 2030. | |
| # To eliminate warnings: upgrade to uvicorn 0.35.0+ when available with SansIO implementation | |
| # See: https://websockets.readthedocs.io/en/stable/project/changelog.html | |
| warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets.legacy") | |
| warnings.filterwarnings("ignore", category=DeprecationWarning, module="websockets.server") | |
| # Suppress Gradio internal deprecation warnings | |
| warnings.filterwarnings("ignore", category=DeprecationWarning, module="gradio.routes") | |
| # Get logger for this module (logging already configured via configure_logging()) | |
| logger = logging.getLogger(__name__) | |
| # Initialize workflow | |
| workflow = PortfolioAnalysisWorkflow(mcp_router) | |
| # Custom get_user_tier function for session-aware rate limiting | |
| def get_user_tier_from_session(request: Optional[gr.Request], session_state: Optional[dict] = None): | |
| """Determine user tier from session state. | |
| Args: | |
| request: Gradio request object | |
| session_state: Session state dict containing user authentication info | |
| Returns: | |
| Tuple of (identifier, tier) | |
| - For authenticated users: (user_id, AUTHENTICATED) | |
| - For demo mode: (ip_hash, ANONYMOUS) | |
| """ | |
| import hashlib | |
| # Check if user is authenticated via session state | |
| if session_state and isinstance(session_state, dict): | |
| user_id = session_state.get("user_id") | |
| is_demo = session_state.get("is_demo", False) | |
| # Authenticated user: use user_id for rate limiting | |
| if user_id and not is_demo: | |
| return str(user_id), UserTier.AUTHENTICATED | |
| # Demo mode or unauthenticated: use IP-based rate limiting | |
| # Extract client IP from request | |
| client_ip = "unknown" | |
| if request: | |
| try: | |
| if hasattr(request, "client") and request.client: | |
| if hasattr(request.client, "host"): | |
| client_ip = request.client.host | |
| elif isinstance(request.client, str): | |
| client_ip = request.client | |
| # Check headers for forwarded IPs (behind proxy) | |
| if hasattr(request, "headers"): | |
| forwarded = request.headers.get("X-Forwarded-For") | |
| if forwarded: | |
| client_ip = forwarded.split(",")[0].strip() | |
| except Exception as e: | |
| logger.warning(f"Error extracting client IP: {e}") | |
| # Hash IP for privacy | |
| identifier = hashlib.sha256(client_ip.encode()).hexdigest()[:16] | |
| return identifier, UserTier.ANONYMOUS | |
| # Initialize rate limiter | |
| rate_limiter = None | |
| rate_limit_middleware = None | |
| if settings.rate_limit_enabled: | |
| try: | |
| rate_limiter = TieredFixedWindowLimiter( | |
| tier_limits={ | |
| UserTier.ANONYMOUS: settings.rate_limit_anonymous_capacity, | |
| UserTier.AUTHENTICATED: settings.rate_limit_authenticated_capacity, | |
| UserTier.PREMIUM: settings.rate_limit_premium_capacity, | |
| }, | |
| redis_url=settings.redis_url | |
| ) | |
| rate_limit_middleware = GradioRateLimitMiddleware( | |
| rate_limiter, | |
| get_user_tier=get_user_tier_from_session | |
| ) | |
| logger.info("Rate limiting enabled with fixed window (daily reset at midnight UTC)") | |
| except Exception as e: | |
| logger.error(f"Failed to initialise rate limiter: {e}") | |
| logger.warning("Continuing without rate limiting") | |
| else: | |
| logger.info("Rate limiting disabled") | |
| # Global state for visualisations | |
| LAST_ANALYSIS_STATE = None | |
| HISTORY_RECORDS = [] # Stores loaded history records for row selection | |
| LAST_STRESS_TEST = None | |
| LAST_EXPORT_PDF_PATH = None # Pre-generated PDF export path | |
| LAST_EXPORT_CSV_PATH = None # Pre-generated CSV export path | |
| # Global state for audio generation | |
| LAST_ANALYSIS_TEXT = None # Stores analysis text for audio generation | |
| LAST_BUILD_RESULT = None # Stores build portfolio result for audio | |
| LAST_DEBATE_DATA = None # Stores debate data for audio simulation | |
| # Loading screen rotating messages with MCP phases and disclaimers | |
| LOADING_MESSAGES = [ | |
| "MCP Workflow: Initialising Model Context Protocol servers...", | |
| "Disclaimer: This is NOT financial advice - consult a professional!", | |
| "MCP: Aggregating context from multiple data sources...", | |
| "Humour: If in doubt, blame the algorithm!", | |
| "MCP Workflow: Processing portfolio through context pipeline...", | |
| "Warning: Past performance does not guarantee future results!", | |
| "MCP: Routing requests to specialised analysis servers...", | |
| "Remember: Markets can stay irrational longer than you can stay solvent!", | |
| "MCP Workflow: Extracting insights from financial context...", | |
| "Important: Not a substitute for professional financial guidance!", | |
| "MCP: Coordinating between data, analysis, and reporting contexts...", | |
| "Pro tip: Diversification - the only free lunch in investing!", | |
| "Phase 1: Fetching market data from Yahoo Finance, FMP, FRED...", | |
| "Phase 2: Running portfolio optimisation and risk analysis...", | |
| "Phase 3: Generating AI insights with Claude Sonnet 4.5...", | |
| "Data Collection: Retrieving real-time quotes and historical prices...", | |
| "Computation: Calculating portfolio metrics and risk indicators...", | |
| "Analysis: Synthesising comprehensive portfolio recommendations...", | |
| ] | |
| def parse_portfolio_input(portfolio_text: str) -> List[Dict[str, Any]]: | |
| """Parse portfolio input text into structured holdings. | |
| Supports formats: | |
| - AAPL 50 (50 shares) | |
| - TSLA 25 shares | |
| - NVDA $5000 (dollar amount) | |
| - BTC 0.5 (fractional shares/crypto) | |
| Args: | |
| portfolio_text: Raw text input from user | |
| Returns: | |
| List of holding dictionaries | |
| """ | |
| holdings = [] | |
| lines = portfolio_text.strip().split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| # Match patterns: TICKER QUANTITY [shares] or TICKER $AMOUNT | |
| match = re.match(r'([A-Za-z]+)\s+(\$)?([0-9.]+)\s*(shares)?', line, re.IGNORECASE) | |
| if match: | |
| ticker = match.group(1).upper() | |
| is_dollar = match.group(2) == '$' | |
| amount = float(match.group(3)) | |
| holdings.append({ | |
| 'ticker': ticker, | |
| 'quantity': amount if not is_dollar else 0, | |
| 'dollar_amount': amount if is_dollar else 0, | |
| 'cost_basis': 0 | |
| }) | |
| else: | |
| logger.warning(f"Could not parse line: {line}") | |
| return holdings | |
| def aggregate_holdings(holdings: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: | |
| """Aggregate holdings by ticker while preserving lot-level data. | |
| Groups multiple entries for the same ticker together and tracks | |
| individual tax lots. Useful for handling duplicate entries and | |
| mixed entry types (shares + dollars). | |
| Args: | |
| holdings: List of holding dictionaries from parse_portfolio_input | |
| Returns: | |
| Dictionary with ticker as key and aggregated data as value, | |
| containing: ticker, total_quantity, total_dollar_amount, lots, warnings | |
| """ | |
| aggregated = {} | |
| for holding in holdings: | |
| ticker = holding['ticker'] | |
| if ticker not in aggregated: | |
| aggregated[ticker] = { | |
| 'ticker': ticker, | |
| 'total_quantity': 0, | |
| 'total_dollar_amount': 0, | |
| 'lots': [], | |
| 'warnings': [] | |
| } | |
| aggregated[ticker]['total_quantity'] += holding.get('quantity', 0) | |
| aggregated[ticker]['total_dollar_amount'] += holding.get('dollar_amount', 0) | |
| aggregated[ticker]['lots'].append(holding) | |
| # Check for duplicates and mixed entry types | |
| for ticker, data in aggregated.items(): | |
| if len(data['lots']) > 1: | |
| has_shares = any(lot['quantity'] > 0 for lot in data['lots']) | |
| has_dollars = any(lot['dollar_amount'] > 0 for lot in data['lots']) | |
| if has_shares and has_dollars: | |
| data['warnings'].append( | |
| f"{ticker} has mixed entry types (shares and dollars). " | |
| f"Total: {data['total_quantity']:.2f} sh + ${data['total_dollar_amount']:,.2f}" | |
| ) | |
| else: | |
| data['warnings'].append( | |
| f"{ticker} appears {len(data['lots'])} times. " | |
| f"Aggregated: {data['total_quantity']:.2f} shares" if has_shares | |
| else f"Aggregated: ${data['total_dollar_amount']:,.2f}" | |
| ) | |
| return aggregated | |
| async def run_analysis( | |
| portfolio_text: str, | |
| roast_mode: bool = False, | |
| persona: Optional[str] = None | |
| ) -> str: | |
| """Run portfolio analysis workflow. | |
| Args: | |
| portfolio_text: Raw portfolio input | |
| roast_mode: If True, use brutal honesty mode | |
| persona: Optional investor persona (e.g., 'warren_buffett') | |
| Returns: | |
| Formatted analysis result | |
| """ | |
| global LAST_ANALYSIS_STATE | |
| if not portfolio_text.strip(): | |
| return "β Please enter your portfolio holdings" | |
| try: | |
| holdings = parse_portfolio_input(portfolio_text) | |
| if not holdings: | |
| return "β Could not parse portfolio. Please use format: TICKER QUANTITY" | |
| logger.info( | |
| f"Starting analysis for {len(holdings)} holdings " | |
| f"(Roast Mode: {roast_mode}, Persona: {persona or 'None'})" | |
| ) | |
| # Construct initial state | |
| portfolio_id = f"demo_{datetime.now().strftime('%Y%m%d_%H%M%S')}" | |
| initial_state: AgentState = { | |
| 'portfolio_id': portfolio_id, | |
| 'user_query': 'Analyse my portfolio', | |
| 'risk_tolerance': 'moderate', | |
| 'holdings': holdings, | |
| 'historical_prices': {}, | |
| 'fundamentals': {}, | |
| 'economic_data': {}, | |
| 'realtime_data': {}, | |
| 'technical_indicators': {}, | |
| 'optimisation_results': {}, | |
| 'risk_analysis': {}, | |
| 'ai_synthesis': '', | |
| 'recommendations': [], | |
| 'reasoning_steps': [], | |
| 'current_step': 'starting', | |
| 'errors': [], | |
| 'mcp_calls': [], | |
| 'phase_1_duration_ms': None, | |
| 'phase_1_5_duration_ms': None, | |
| 'phase_2_duration_ms': None, | |
| 'phase_2_5_duration_ms': None, | |
| 'phase_3_duration_ms': None, | |
| 'llm_input_tokens': None, | |
| 'llm_output_tokens': None, | |
| 'llm_total_tokens': None, | |
| 'llm_request_count': None, | |
| 'feature_vectors': {}, | |
| 'ensemble_forecasts': {}, | |
| 'sentiment_data': {}, | |
| } | |
| # Create workflow with persona or roast mode | |
| # Note: persona takes precedence over roast mode | |
| if persona and persona != "standard": | |
| analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, persona=persona) | |
| else: | |
| analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, roast_mode=roast_mode) | |
| # Run workflow | |
| final_state = await analysis_workflow.run(initial_state) | |
| LAST_ANALYSIS_STATE = final_state | |
| if final_state.get("errors"): | |
| return f"β Analysis failed: {'; '.join(final_state['errors'])}" | |
| # Format result | |
| return format_analysis_result(final_state, holdings) | |
| except Exception as e: | |
| logger.error(f"Analysis error: {e}", exc_info=True) | |
| return f"β Error during analysis: {str(e)}" | |
| def format_performance_metrics(final_state: AgentState) -> str: | |
| """Format performance metrics for reasoning display. | |
| Args: | |
| final_state: Final AgentState from workflow | |
| Returns: | |
| Formatted markdown string with performance metrics | |
| """ | |
| metrics_md = "### Performance Metrics\n\n" | |
| # Phase Execution Times | |
| phase_1_ms = final_state.get("phase_1_duration_ms", 0) or 0 | |
| phase_2_ms = final_state.get("phase_2_duration_ms", 0) or 0 | |
| phase_2_5_ms = final_state.get("phase_2_5_duration_ms", 0) or 0 | |
| phase_3_ms = final_state.get("phase_3_duration_ms", 0) or 0 | |
| total_ms = phase_1_ms + phase_2_ms + phase_2_5_ms + phase_3_ms | |
| metrics_md += "**Execution Timeline:**\n\n" | |
| metrics_md += f"- Phase 1 (Data Collection): {phase_1_ms:,}ms\n" | |
| metrics_md += f"- Phase 2 (Computation): {phase_2_ms:,}ms\n" | |
| if phase_2_5_ms > 0: | |
| metrics_md += f"- Phase 2.5 (ML Predictions): {phase_2_5_ms:,}ms\n" | |
| metrics_md += f"- Phase 3 (LLM Synthesis): {phase_3_ms:,}ms\n" | |
| metrics_md += f"- **Total**: {total_ms:,}ms ({total_ms / 1000:.2f}s)\n\n" | |
| # LLM Token Usage | |
| input_tokens = final_state.get("llm_input_tokens", 0) or 0 | |
| output_tokens = final_state.get("llm_output_tokens", 0) or 0 | |
| total_tokens = final_state.get("llm_total_tokens", 0) or 0 | |
| request_count = final_state.get("llm_request_count", 0) or 0 | |
| # Anthropic Claude Sonnet 4.5 pricing (as of early 2025) | |
| # Input: $3 per million tokens, Output: $15 per million tokens | |
| input_cost = (input_tokens / 1_000_000) * 3.0 | |
| output_cost = (output_tokens / 1_000_000) * 15.0 | |
| total_cost = input_cost + output_cost | |
| metrics_md += "**LLM Token Usage (Claude Sonnet 4.5):**\n\n" | |
| metrics_md += f"- Input Tokens: {input_tokens:,} (${input_cost:.4f})\n" | |
| metrics_md += f"- Output Tokens: {output_tokens:,} (${output_cost:.4f})\n" | |
| metrics_md += f"- **Total**: {total_tokens:,} tokens (${total_cost:.4f})\n" | |
| metrics_md += f"- API Requests: {request_count}\n\n" | |
| # MCP Calls | |
| mcp_calls = final_state.get("mcp_calls", []) | |
| if mcp_calls: | |
| metrics_md += f"**MCP Operations:** {len(mcp_calls)} total calls\n\n" | |
| # Group by MCP server | |
| mcp_groups = {} | |
| for call in mcp_calls: | |
| server = call.get("mcp_server") or call.get("mcp", "unknown") | |
| if server not in mcp_groups: | |
| mcp_groups[server] = [] | |
| mcp_groups[server].append(call) | |
| for server, calls in mcp_groups.items(): | |
| metrics_md += f"- **{server}**: {len(calls)} call{'s' if len(calls) != 1 else ''}\n" | |
| return metrics_md | |
| def format_analysis_result(final_state: AgentState, holdings: List[Dict[str, Any]]) -> str: | |
| """Format analysis result for display. | |
| Args: | |
| final_state: Final AgentState from workflow | |
| holdings: Original holdings list | |
| Returns: | |
| Formatted markdown string | |
| """ | |
| # Calculate total portfolio value | |
| total_value = sum(h.get('market_value', 0) for h in final_state.get('holdings', holdings)) | |
| # Format holdings | |
| holdings_md = "### Portfolio Holdings\n\n" | |
| for h in final_state.get('holdings', holdings): | |
| ticker = h.get("ticker", "N/A") | |
| shares = h.get("quantity", 0) | |
| value = h.get("market_value", 0) | |
| weight = (value / total_value * 100) if total_value > 0 else 0 | |
| # Format differently based on whether entered by shares or dollar amount | |
| if shares > 0: | |
| holdings_md += f"- **{ticker}**: {shares:.2f} shares | ${value:,.2f} ({weight:.1f}%)\n" | |
| else: | |
| holdings_md += f"- **{ticker}**: ${value:,.2f} ({weight:.1f}%)\n" | |
| # Extract risk metrics from risk_analysis | |
| risk_analysis = final_state.get("risk_analysis", {}) | |
| # Extract from nested dicts | |
| var_95_dict = risk_analysis.get("var_95", {}) | |
| var_95 = abs(float(var_95_dict.get("var_absolute", 0))) if isinstance(var_95_dict, dict) else abs(float(var_95_dict)) | |
| cvar_95_dict = risk_analysis.get("cvar_95", {}) | |
| cvar_95 = abs(float(cvar_95_dict.get("cvar_absolute", 0))) if isinstance(cvar_95_dict, dict) else abs(float(cvar_95_dict)) | |
| risk_metrics = risk_analysis.get("risk_metrics", {}) | |
| sharpe = float(risk_metrics.get("sharpe_ratio", 0)) | |
| volatility = float(risk_metrics.get("volatility_annual", 0)) * 100 | |
| # Extract advanced metrics | |
| information_ratio = risk_metrics.get("information_ratio") | |
| calmar_ratio = risk_metrics.get("calmar_ratio") | |
| ulcer_index = risk_metrics.get("ulcer_index") | |
| metrics_md = f""" | |
| ### Key Metrics | |
| - **Portfolio Value**: ${total_value:,.2f} | |
| - **Sharpe Ratio**: {sharpe:.2f} | |
| - **Volatility**: {volatility:.1f}% (annual) | |
| - **VaR (95%)**: ${var_95:,.2f} | |
| - **CVaR (95%)**: ${cvar_95:,.2f} | |
| """ | |
| # Add advanced metrics if available | |
| if information_ratio is not None: | |
| metrics_md += f"- **Information Ratio**: {float(information_ratio):.2f}\n" | |
| if calmar_ratio is not None: | |
| metrics_md += f"- **Calmar Ratio**: {float(calmar_ratio):.2f}\n" | |
| if ulcer_index is not None: | |
| metrics_md += f"- **Ulcer Index**: {float(ulcer_index):.2f}\n" | |
| # Format ML forecasts if available | |
| forecasts_md = "" | |
| ensemble_forecasts = final_state.get("ensemble_forecasts", {}) | |
| if ensemble_forecasts: | |
| forecasts_md = "### ML Price Forecasts (30-day)\n\n" | |
| for ticker, forecast_data in ensemble_forecasts.items(): | |
| if isinstance(forecast_data, dict): | |
| models_used = forecast_data.get("models_used", []) | |
| num_models = len(models_used) if models_used else forecast_data.get("metadata", {}).get("num_models", "N/A") | |
| # Get first and last prediction (current vs 30-day) | |
| predictions = forecast_data.get("predictions", []) | |
| if predictions and len(predictions) > 0: | |
| first_pred = float(predictions[0]) | |
| last_pred = float(predictions[-1]) | |
| change_pct = ((last_pred - first_pred) / first_pred * 100) if first_pred > 0 else 0 | |
| direction = "π" if change_pct > 0 else "π" if change_pct < 0 else "β‘οΈ" | |
| forecasts_md += f"- **{ticker}**: {direction} {change_pct:+.1f}% (using {num_models} models)\n" | |
| forecasts_md += "\n*Note: Forecasts combine Chronos foundation model with statistical baselines.*\n\n" | |
| # Get LLM synthesis and recommendations | |
| ai_synthesis = final_state.get("ai_synthesis", "Analysis completed successfully.") | |
| recommendations = final_state.get("recommendations", []) | |
| recs_md = "### Recommendations\n\n" | |
| if recommendations: | |
| for i, rec in enumerate(recommendations, 1): | |
| recs_md += f"{i}. {rec}\n" | |
| else: | |
| recs_md += "*No specific recommendations generated.*\n" | |
| # Final output | |
| output = f"""{holdings_md} | |
| {metrics_md} | |
| {forecasts_md} | |
| {ai_synthesis} | |
| {recs_md} | |
| --- | |
| *Interactive charts and detailed metrics are displayed below.* | |
| """ | |
| return output | |
| def create_visualisations() -> Tuple: | |
| """Create all visualisation charts from last analysis. | |
| Returns: | |
| Tuple of (allocation, risk, performance, correlation, optimization) plots | |
| """ | |
| global LAST_ANALYSIS_STATE | |
| if not LAST_ANALYSIS_STATE: | |
| empty_fig = None | |
| return (empty_fig, empty_fig, empty_fig, empty_fig, empty_fig) | |
| # Extract data from AgentState | |
| holdings = LAST_ANALYSIS_STATE.get("holdings", []) | |
| historical_prices = LAST_ANALYSIS_STATE.get("historical_prices", {}) | |
| risk_analysis = LAST_ANALYSIS_STATE.get("risk_analysis", {}) | |
| optimisation_results = LAST_ANALYSIS_STATE.get("optimisation_results", {}) | |
| # Create charts | |
| allocation_chart = create_portfolio_allocation_chart(holdings) if holdings else None | |
| # Extract nested risk metrics correctly | |
| if risk_analysis: | |
| var_95_dict = risk_analysis.get("var_95", {}) | |
| var_value = float(var_95_dict.get("var_percentage", 0)) if isinstance(var_95_dict, dict) else float(var_95_dict) | |
| cvar_95_dict = risk_analysis.get("cvar_95", {}) | |
| cvar_value = float(cvar_95_dict.get("cvar_percentage", 0)) if isinstance(cvar_95_dict, dict) else float(cvar_95_dict) | |
| risk_metrics = risk_analysis.get("risk_metrics", {}) | |
| sharpe_value = float(risk_metrics.get("sharpe_ratio", 0)) | |
| volatility_value = float(risk_metrics.get("volatility_annual", 0)) * 100 | |
| risk_chart = create_risk_metrics_dashboard( | |
| sharpe=sharpe_value, | |
| var=var_value, | |
| cvar=cvar_value, | |
| volatility=volatility_value | |
| ) | |
| else: | |
| risk_chart = None | |
| performance_chart = create_performance_chart(holdings, historical_prices) if historical_prices else None | |
| correlation_chart = create_correlation_heatmap(historical_prices) if historical_prices else None | |
| optimization_chart = create_optimization_comparison(optimisation_results) if optimisation_results else None | |
| return (allocation_chart, risk_chart, performance_chart, correlation_chart, optimization_chart) | |
| async def run_stress_test( | |
| scenario_name: str, | |
| n_simulations: int = 10000, | |
| time_horizon_days: int = 252 | |
| ) -> Tuple[str, Any, Any, Any, Any]: | |
| """Run stress test on current portfolio. | |
| Args: | |
| scenario_name: Name of stress scenario to apply | |
| n_simulations: Number of Monte Carlo simulations | |
| time_horizon_days: Simulation time horizon in days | |
| Returns: | |
| Tuple of (summary_text, dashboard_plot, mc_plot, scenario_plot, drawdown_plot) | |
| """ | |
| global LAST_ANALYSIS_STATE, LAST_STRESS_TEST | |
| if not LAST_ANALYSIS_STATE: | |
| return ( | |
| "Please run portfolio analysis first before stress testing.", | |
| None, None, None, None | |
| ) | |
| try: | |
| # Extract portfolio data | |
| holdings = LAST_ANALYSIS_STATE.get("holdings", []) | |
| historical_prices = LAST_ANALYSIS_STATE.get("historical_prices", {}) | |
| if not holdings or not historical_prices: | |
| return ( | |
| "Insufficient portfolio data for stress testing.", | |
| None, None, None, None | |
| ) | |
| # Initialise stress tester | |
| stress_tester = PortfolioStressTest( | |
| holdings=holdings, | |
| historical_prices=historical_prices, | |
| n_simulations=n_simulations | |
| ) | |
| LAST_STRESS_TEST = stress_tester | |
| # Get portfolio value | |
| portfolio_value = sum(h.get('market_value', 0) for h in holdings) | |
| # Run selected scenario or Monte Carlo | |
| if scenario_name == "monte_carlo": | |
| result = stress_tester.run_monte_carlo(time_horizon_days=time_horizon_days) | |
| dashboard_plot = create_stress_test_dashboard(result, portfolio_value) | |
| mc_plot = create_monte_carlo_paths_plot(result.simulation_paths, portfolio_value=portfolio_value) | |
| drawdown_plot = create_drawdown_analysis_chart(result.simulation_paths, portfolio_value=portfolio_value) | |
| scenario_plot = None | |
| summary = f"""# Monte Carlo Stress Test Results | |
| **Simulation Parameters:** | |
| - Number of simulations: {n_simulations:,} | |
| - Time horizon: {time_horizon_days} days (~{time_horizon_days/252:.1f} years) | |
| **Key Metrics:** | |
| - Mean return: {result.portfolio_return:.2f}% | |
| - Value at Risk (95%): {result.var_95:.2f}% | |
| - Conditional VaR (95%): {result.cvar_95:.2f}% | |
| - Maximum drawdown: {result.max_drawdown:.2f}% | |
| **Interpretation:** | |
| - In 95% of scenarios, your portfolio loses less than {abs(result.var_95):.1f}% | |
| - In the worst 5% of cases, average loss is {abs(result.cvar_95):.1f}% | |
| """ | |
| elif scenario_name == "all_scenarios": | |
| all_results = stress_tester.run_all_scenarios() | |
| scenario_plot = create_scenario_comparison_chart(all_results) | |
| dashboard_plot = None | |
| mc_plot = None | |
| drawdown_plot = None | |
| summary = "# All Stress Scenarios Comparison\n\n" | |
| summary += "**Portfolio Performance Under Historical Crises:**\n\n" | |
| for scenario_id, result in all_results.items(): | |
| recovery_text = f"{result.recovery_time_estimate} days (~{result.recovery_time_estimate/365:.1f} years)" if result.recovery_time_estimate else "N/A" | |
| summary += f"### {result.scenario_name}\n" | |
| summary += f"- Portfolio return: **{result.portfolio_return:.2f}%**\n" | |
| summary += f"- Value change: **${result.portfolio_value_change:,.0f}**\n" | |
| summary += f"- VaR 95%: {result.var_95:.2f}%\n" | |
| summary += f"- CVaR 95%: {result.cvar_95:.2f}%\n" | |
| summary += f"- Estimated recovery: {recovery_text}\n\n" | |
| else: | |
| # Run specific scenario | |
| scenario = STRESS_SCENARIOS.get(scenario_name) | |
| if not scenario: | |
| return ( | |
| f"Scenario '{scenario_name}' not found.", | |
| None, None, None, None | |
| ) | |
| result = stress_tester.apply_scenario(scenario) | |
| dashboard_plot = create_stress_test_dashboard(result, portfolio_value) | |
| mc_plot = None | |
| scenario_plot = None | |
| drawdown_plot = None | |
| recovery_text = f"{result.recovery_time_estimate} days (~{result.recovery_time_estimate/365:.1f} years)" if result.recovery_time_estimate else "Not estimated" | |
| summary = f"""# {result.scenario_name} - Stress Test Results | |
| **Scenario Description:** | |
| {scenario.description} | |
| **Portfolio Impact:** | |
| - Return under stress: **{result.portfolio_return:.2f}%** | |
| - Dollar impact: **${result.portfolio_value_change:,.0f}** | |
| - Current value: ${portfolio_value:,.0f} | |
| - Stressed value: ${portfolio_value + result.portfolio_value_change:,.0f} | |
| **Risk Metrics:** | |
| - Value at Risk (95%): {result.var_95:.2f}% | |
| - Conditional VaR (95%): {result.cvar_95:.2f}% | |
| - Value at Risk (99%): {result.var_99:.2f}% | |
| - Conditional VaR (99%): {result.cvar_99:.2f}% | |
| **Recovery Estimate:** | |
| - Estimated recovery time: {recovery_text} | |
| **Asset Contributions:** | |
| """ | |
| for ticker, contribution in sorted(result.asset_contributions.items(), key=lambda x: x[1]): | |
| summary += f"- {ticker}: {contribution:+.2f}%\n" | |
| return (summary, dashboard_plot, mc_plot, scenario_plot, drawdown_plot) | |
| except Exception as e: | |
| logger.error(f"Stress test error: {e}", exc_info=True) | |
| return ( | |
| f"Error running stress test: {str(e)}", | |
| None, None, None, None | |
| ) | |
| async def run_analysis_with_ui_update( | |
| session_state: Optional[Dict] = None, | |
| portfolio_text: str = "", | |
| roast_mode: bool = False, | |
| persona: Optional[str] = None, | |
| progress=gr.Progress() | |
| ) -> Tuple[str, str, str, Any, Any, Any, Any, Any, Any]: | |
| """Run analysis and return results with loading progress overlay. | |
| Uses rotating messages from LOADING_MESSAGES to display progress updates. | |
| Args: | |
| session_state: Session state dict containing user authentication info | |
| portfolio_text: Portfolio input | |
| roast_mode: If True, use brutal honesty mode | |
| persona: Optional investor persona (e.g., 'warren_buffett') | |
| progress: Gradio progress tracker | |
| Returns: | |
| Tuple of (page_state, analysis_text, performance_metrics, 5 charts, audio_btn_update) | |
| """ | |
| global LAST_ANALYSIS_STATE | |
| if not portfolio_text.strip(): | |
| return ( | |
| "input", | |
| "", | |
| "", | |
| None, None, None, None, None, | |
| gr.update(visible=False) | |
| ) | |
| try: | |
| progress(0, desc=random.choice(LOADING_MESSAGES)) | |
| await asyncio.sleep(0.3) | |
| holdings = parse_portfolio_input(portfolio_text) | |
| if not holdings: | |
| return ( | |
| "input", | |
| "", | |
| "", | |
| None, None, None, None, None, | |
| gr.update(visible=False) | |
| ) | |
| progress(0.1, desc=random.choice(LOADING_MESSAGES)) | |
| await asyncio.sleep(0.5) | |
| # Check if this is a demo session | |
| is_demo = session_state.get("is_demo", False) if session_state else False | |
| # Generate portfolio ID with appropriate prefix | |
| timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
| if is_demo: | |
| portfolio_id = f"demo_{timestamp}" | |
| else: | |
| # Authenticated users get portfolio IDs with user prefix | |
| session = UserSession.from_dict(session_state) | |
| user_prefix = session.username[:8] if session and session.username else "user" | |
| portfolio_id = f"{user_prefix}_{timestamp}" | |
| # Only save portfolio to database for authenticated users | |
| # Demo users get ephemeral portfolios that exist only for the current session | |
| if not is_demo: | |
| try: | |
| session = UserSession.from_dict(session_state) | |
| user_id = session.user_id if session and session.user_id else None | |
| if not user_id: | |
| logger.error("No valid user_id available for portfolio creation") | |
| raise ValueError("User ID not available") | |
| portfolio_saved = await db.save_portfolio( | |
| portfolio_id=portfolio_id, | |
| user_id=user_id, | |
| name=f"Analysis {datetime.now().strftime('%Y-%m-%d %H:%M')}", | |
| risk_tolerance='moderate' | |
| ) | |
| if portfolio_saved: | |
| logger.info(f"β Portfolio {portfolio_id} created for user {user_id}") | |
| else: | |
| logger.error(f"β Failed to create portfolio {portfolio_id} in database") | |
| raise ValueError(f"Portfolio creation failed for {portfolio_id}") | |
| except Exception as e: | |
| logger.error(f"β Failed to save portfolio: {e}") | |
| raise ValueError(f"Database error: {e}") from e | |
| else: | |
| logger.info(f"Demo mode: Portfolio {portfolio_id} created (ephemeral, not saved to database)") | |
| initial_state: AgentState = { | |
| 'portfolio_id': portfolio_id, | |
| 'user_query': 'Analyse my portfolio', | |
| 'risk_tolerance': 'moderate', | |
| 'holdings': holdings, | |
| 'historical_prices': {}, | |
| 'fundamentals': {}, | |
| 'economic_data': {}, | |
| 'realtime_data': {}, | |
| 'technical_indicators': {}, | |
| 'optimisation_results': {}, | |
| 'risk_analysis': {}, | |
| 'ai_synthesis': '', | |
| 'recommendations': [], | |
| 'reasoning_steps': [], | |
| 'current_step': 'starting', | |
| 'errors': [], | |
| 'mcp_calls': [], | |
| 'phase_1_duration_ms': None, | |
| 'phase_1_5_duration_ms': None, | |
| 'phase_2_duration_ms': None, | |
| 'phase_2_5_duration_ms': None, | |
| 'phase_3_duration_ms': None, | |
| 'llm_input_tokens': None, | |
| 'llm_output_tokens': None, | |
| 'llm_total_tokens': None, | |
| 'llm_request_count': None, | |
| 'feature_vectors': {}, | |
| 'ensemble_forecasts': {}, | |
| 'sentiment_data': {}, | |
| } | |
| progress(0.2, desc="Phase 1: Fetching market data from Yahoo Finance, FMP, FRED...") | |
| # Create workflow with persona or roast mode | |
| # Note: persona takes precedence over roast mode | |
| if persona and persona != "standard": | |
| analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, persona=persona) | |
| else: | |
| analysis_workflow = PortfolioAnalysisWorkflow(mcp_router, roast_mode=roast_mode) | |
| # Stream workflow execution with live progress updates | |
| final_state = None | |
| async for event in analysis_workflow.stream(initial_state): | |
| progress_val = event.get("progress", 0) | |
| message = event.get("message", "Processing...") | |
| progress(progress_val, desc=message) | |
| if event.get("event") == "complete": | |
| final_state = event.get("state") | |
| break | |
| final_state = event.get("state") | |
| if final_state is None: | |
| raise ValueError("Workflow did not return final state") | |
| LAST_ANALYSIS_STATE = final_state | |
| # Pre-generate export files immediately after analysis completes | |
| # This ensures DownloadButton has files ready on first click | |
| global LAST_EXPORT_PDF_PATH, LAST_EXPORT_CSV_PATH | |
| try: | |
| import tempfile | |
| # Generate PDF export | |
| pdf_bytes = export_analysis_to_pdf(final_state) | |
| with tempfile.NamedTemporaryFile( | |
| mode='wb', | |
| delete=False, | |
| suffix='.pdf', | |
| prefix='portfolio_analysis_', | |
| dir=tempfile.gettempdir() | |
| ) as f: | |
| f.write(pdf_bytes) | |
| LAST_EXPORT_PDF_PATH = f.name | |
| logger.info(f"Pre-generated PDF export: {LAST_EXPORT_PDF_PATH}") | |
| # Generate CSV export | |
| csv_content = export_analysis_to_csv(final_state) | |
| with tempfile.NamedTemporaryFile( | |
| mode='w', | |
| delete=False, | |
| suffix='.csv', | |
| prefix='portfolio_analysis_', | |
| dir=tempfile.gettempdir() | |
| ) as f: | |
| f.write(csv_content) | |
| LAST_EXPORT_CSV_PATH = f.name | |
| logger.info(f"Pre-generated CSV export: {LAST_EXPORT_CSV_PATH}") | |
| except Exception as e: | |
| logger.error(f"Failed to pre-generate export files: {e}") | |
| LAST_EXPORT_PDF_PATH = None | |
| LAST_EXPORT_CSV_PATH = None | |
| # Save analysis to database (Enhancement #4 - Historical Analysis Storage) | |
| # Only save for authenticated users - demo analyses are ephemeral | |
| if not is_demo: | |
| try: | |
| session = UserSession.from_dict(session_state) | |
| user_identifier = session.user_id if session and session.user_id else "unknown" | |
| logger.info(f"Saving analysis for portfolio {portfolio_id}, user: {user_identifier}") | |
| save_result = await db.save_analysis(portfolio_id, final_state) | |
| if save_result: | |
| logger.info(f"β Successfully saved analysis for portfolio {portfolio_id}") | |
| else: | |
| logger.error(f"β Failed to save analysis for portfolio {portfolio_id} (returned False)") | |
| # Note: We don't raise here because the analysis itself succeeded | |
| # The user still gets their results even if database save fails | |
| except Exception as e: | |
| logger.error(f"β Exception saving analysis for portfolio {portfolio_id}: {e}", exc_info=True) | |
| # Note: We don't raise here because the analysis itself succeeded | |
| else: | |
| logger.info(f"Demo mode: Analysis for portfolio {portfolio_id} not saved (ephemeral session)") | |
| charts = create_visualisations() | |
| analysis_text = format_analysis_result(final_state, holdings) | |
| performance_metrics = format_performance_metrics(final_state) | |
| progress(1.0, desc="Analysis complete!") | |
| return ( | |
| "results", | |
| analysis_text, | |
| performance_metrics, | |
| *charts, | |
| gr.update(visible=True) # analysis_audio_btn (show audio button) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Analysis error: {e}", exc_info=True) | |
| return ( | |
| "input", | |
| f"β Error: {str(e)}", | |
| "", | |
| None, None, None, None, None, | |
| gr.update(visible=False) # analysis_audio_btn (hide on error) | |
| ) | |
| def update_live_preview(portfolio_text: str) -> str: | |
| """Update live preview with parsed portfolio summary (without prices). | |
| Aggregates duplicate tickers and detects mixed entry types, | |
| displaying warnings for data quality issues. | |
| Args: | |
| portfolio_text: Raw portfolio input text | |
| Returns: | |
| HTML formatted preview of portfolio | |
| """ | |
| if not portfolio_text.strip(): | |
| return """ | |
| <div style='text-align: center; padding: 2rem; color: #999;'> | |
| <p style='font-size: 14px;'>Enter portfolio holdings to see preview</p> | |
| </div> | |
| """ | |
| try: | |
| holdings = parse_portfolio_input(portfolio_text) | |
| if not holdings: | |
| return """ | |
| <div style='text-align: center; padding: 2rem; color: #ef4444;'> | |
| <p style='font-size: 14px;'>Unable to parse portfolio format</p> | |
| </div> | |
| """ | |
| aggregated = aggregate_holdings(holdings) | |
| unique_tickers = len(aggregated) | |
| shares_tickers = sum(1 for d in aggregated.values() if d['total_quantity'] > 0) | |
| dollar_tickers = sum(1 for d in aggregated.values() if d['total_dollar_amount'] > 0) | |
| warnings = [] | |
| for data in aggregated.values(): | |
| warnings.extend(data['warnings']) | |
| warnings_html = "" | |
| if warnings: | |
| warnings_html = f""" | |
| <div style='margin-top: 0.75rem; padding: 0.75rem; background: rgba(251, 146, 60, 0.1); border: 1px solid rgba(251, 146, 60, 0.3); border-radius: 6px;'> | |
| <p style='margin: 0 0 0.5rem 0; font-size: 11px; text-transform: uppercase; color: #fb923c; font-weight: 600;'>Duplicate/Mixed Entries</p> | |
| """ | |
| for warning in warnings: | |
| warnings_html += f"<p style='margin: 0.25rem 0 0 0; font-size: 10px; opacity: 0.9;'>{warning}</p>" | |
| warnings_html += "</div>" | |
| html = f""" | |
| <div style='padding: 1.5rem; display: flex; flex-direction: column;'> | |
| <h3 style='color: #048CFC; margin-bottom: 1rem; margin-top: 0; font-size: 1.25rem;'>Portfolio Preview</h3> | |
| <div style='display: grid; gap: 1rem; grid-template-rows: auto 1fr auto; height: 100%;'> | |
| <div style='border-bottom: 1px solid rgba(255,255,255,0.1); padding-bottom: 0.75rem;'> | |
| <p style='margin: 0; font-size: 12px; opacity: 0.7; text-transform: uppercase;'>Unique Tickers</p> | |
| <p style='margin: 0.25rem 0 0 0; font-size: 28px; font-weight: 600;'>{unique_tickers}</p> | |
| <p style='margin: 0.5rem 0 0 0; font-size: 11px; opacity: 0.6;'>{shares_tickers} by shares β’ {dollar_tickers} by dollar</p> | |
| </div> | |
| <div style='padding-right: 0.5rem; overflow-y: auto; min-height: 0;'> | |
| <p style='margin: 0 0 0.75rem 0; font-size: 12px; opacity: 0.7; text-transform: uppercase;'>Assets (Aggregated)</p> | |
| """ | |
| for ticker, data in sorted(aggregated.items()): | |
| if data['total_dollar_amount'] > 0 and data['total_quantity'] > 0: | |
| value_text = f"{data['total_quantity']:.2f} sh + ${data['total_dollar_amount']:,.0f}" | |
| elif data['total_dollar_amount'] > 0: | |
| value_text = f"${data['total_dollar_amount']:,.0f}" | |
| else: | |
| value_text = f"{data['total_quantity']:.2f} sh" | |
| html += f""" | |
| <div style='display: flex; justify-content: space-between; padding: 0.65rem 0; border-bottom: 1px solid rgba(255,255,255,0.05);'> | |
| <span style='font-weight: 500; font-size: 0.95rem;'>{ticker}</span> | |
| <span style='opacity: 0.7; font-size: 0.9rem;'>{value_text}</span> | |
| </div> | |
| """ | |
| html += f""" | |
| </div> | |
| <div style='margin-top: 0.5rem; padding: 0.75rem; background: rgba(4, 140, 252, 0.1); border-radius: 6px; text-align: center;'> | |
| <p style='margin: 0; font-size: 11px; opacity: 0.8;'>Click "Get Current Prices" for live valuation</p> | |
| </div> | |
| {warnings_html} | |
| </div> | |
| </div> | |
| """ | |
| return html | |
| except Exception as e: | |
| logger.error(f"Preview update error: {e}") | |
| return f"<div style='color: #ef4444; padding: 1rem;'><p style='font-size: 12px;'>Error: {str(e)}</p></div>" | |
| async def fetch_and_update_preview(portfolio_text: str) -> str: | |
| """Fetch current prices and update preview with calculated values. | |
| Aggregates duplicate tickers and displays combined positions | |
| with pricing information from market data sources. | |
| Args: | |
| portfolio_text: Raw portfolio input text | |
| Returns: | |
| HTML formatted preview with current prices | |
| """ | |
| if not portfolio_text.strip(): | |
| return update_live_preview(portfolio_text) | |
| try: | |
| holdings = parse_portfolio_input(portfolio_text) | |
| if not holdings: | |
| return update_live_preview(portfolio_text) | |
| aggregated = aggregate_holdings(holdings) | |
| # Get tickers that need price lookup (have quantity but no dollar_amount) | |
| tickers_needing_prices = [ | |
| ticker for ticker, data in aggregated.items() | |
| if data['total_quantity'] > 0 and data['total_dollar_amount'] == 0 | |
| ] | |
| # Fetch current prices | |
| prices = {} | |
| if tickers_needing_prices: | |
| try: | |
| from backend.mcp_router import mcp_router | |
| quote_results = await mcp_router.call_yahoo_finance_mcp( | |
| tool="get_quote", | |
| params={"tickers": tickers_needing_prices} | |
| ) | |
| if quote_results and isinstance(quote_results, list): | |
| for quote in quote_results: | |
| if isinstance(quote, dict) and 'ticker' in quote and 'price' in quote: | |
| prices[quote['ticker']] = float(quote['price']) | |
| except Exception as e: | |
| logger.error(f"Error fetching prices: {e}") | |
| # Calculate aggregated values | |
| total_value = 0 | |
| aggregated_values = {} | |
| for ticker, data in aggregated.items(): | |
| if data['total_dollar_amount'] > 0: | |
| aggregated_values[ticker] = data['total_dollar_amount'] | |
| total_value += data['total_dollar_amount'] | |
| elif data['total_quantity'] > 0: | |
| price = prices.get(ticker, 0) | |
| value = data['total_quantity'] * price | |
| aggregated_values[ticker] = value | |
| total_value += value | |
| unique_tickers = len(aggregated) | |
| warnings = [] | |
| for data in aggregated.values(): | |
| warnings.extend(data['warnings']) | |
| warnings_html = "" | |
| if warnings: | |
| warnings_html = f""" | |
| <div style='margin-top: 0.75rem; padding: 0.75rem; background: rgba(251, 146, 60, 0.1); border: 1px solid rgba(251, 146, 60, 0.3); border-radius: 6px;'> | |
| <p style='margin: 0 0 0.5rem 0; font-size: 11px; text-transform: uppercase; color: #fb923c; font-weight: 600;'>Duplicate/Mixed Entries</p> | |
| """ | |
| for warning in warnings: | |
| warnings_html += f"<p style='margin: 0.25rem 0 0 0; font-size: 10px; opacity: 0.9;'>{warning}</p>" | |
| warnings_html += "</div>" | |
| html = f""" | |
| <div style='padding: 1.5rem; display: flex; flex-direction: column;'> | |
| <h3 style='color: #048CFC; margin-bottom: 1rem; margin-top: 0; font-size: 1.25rem;'>Portfolio Summary</h3> | |
| <div style='display: grid; gap: 1rem; grid-template-rows: auto auto 1fr auto; height: 100%;'> | |
| <div style='border-bottom: 1px solid rgba(255,255,255,0.1); padding-bottom: 0.75rem;'> | |
| <p style='margin: 0; font-size: 12px; opacity: 0.7; text-transform: uppercase;'>Unique Tickers</p> | |
| <p style='margin: 0.25rem 0 0 0; font-size: 28px; font-weight: 600;'>{unique_tickers}</p> | |
| </div> | |
| <div style='border-bottom: 1px solid rgba(255,255,255,0.1); padding-bottom: 0.75rem;'> | |
| <p style='margin: 0; font-size: 12px; opacity: 0.7; text-transform: uppercase;'>Total Value</p> | |
| <p style='margin: 0.25rem 0 0 0; font-size: 24px; font-weight: 600; color: #10b981;'>${total_value:,.2f}</p> | |
| </div> | |
| <div style='padding-right: 0.5rem; overflow-y: auto; min-height: 0;'> | |
| <p style='margin: 0 0 0.75rem 0; font-size: 12px; opacity: 0.7; text-transform: uppercase;'>Holdings Breakdown (Aggregated)</p> | |
| """ | |
| for ticker in sorted(aggregated.keys()): | |
| data = aggregated[ticker] | |
| current_value = aggregated_values.get(ticker, 0) | |
| weight = (current_value / total_value * 100) if total_value > 0 else 0 | |
| if data['total_dollar_amount'] > 0 and data['total_quantity'] > 0: | |
| detail = f"{data['total_quantity']:.2f} sh + ${data['total_dollar_amount']:,.2f}" | |
| elif data['total_dollar_amount'] > 0: | |
| detail = f"${data['total_dollar_amount']:,.2f}" | |
| elif data['total_quantity'] > 0: | |
| price = prices.get(ticker, 0) | |
| if price > 0: | |
| detail = f"{data['total_quantity']:.2f} sh Γ ${price:.2f}" | |
| else: | |
| detail = f"{data['total_quantity']:.2f} sh (price unavailable)" | |
| else: | |
| detail = "β" | |
| html += f""" | |
| <div style='display: flex; justify-content: space-between; align-items: center; padding: 0.65rem 0; border-bottom: 1px solid rgba(255,255,255,0.05);'> | |
| <div> | |
| <div style='font-weight: 500; font-size: 0.95rem;'>{ticker}</div> | |
| <div style='font-size: 11px; opacity: 0.6; margin-top: 2px;'>{detail}</div> | |
| </div> | |
| <div style='text-align: right;'> | |
| <div style='font-weight: 600; color: #10b981; font-size: 0.95rem;'>${current_value:,.2f}</div> | |
| <div style='font-size: 11px; opacity: 0.6; margin-top: 2px;'>{weight:.1f}%</div> | |
| </div> | |
| </div> | |
| """ | |
| html += f""" | |
| </div> | |
| <div style='margin-top: 0.5rem; padding: 0.75rem; background: rgba(16, 185, 129, 0.1); border-radius: 6px; text-align: center;'> | |
| <p style='margin: 0; font-size: 11px; color: #10b981;'>Prices updated</p> | |
| </div> | |
| {warnings_html} | |
| </div> | |
| </div> | |
| """ | |
| return html | |
| except Exception as e: | |
| logger.error(f"Fetch preview error: {e}") | |
| return f"<div style='color: #ef4444; padding: 1rem;'><p style='font-size: 12px;'>Error fetching prices: {str(e)}</p></div>" | |
| def create_interface() -> gr.Blocks: | |
| """Create the main Gradio interface with single-page workflow. | |
| Returns: | |
| Gradio Blocks interface | |
| """ | |
| theme = get_financial_theme() | |
| # Custom CSS for full-width and responsive design | |
| custom_css = FINANCIAL_CSS + """ | |
| /* Full-width container */ | |
| .gradio-container { | |
| max-width: unset; | |
| width: 100%; | |
| padding: 1.5rem 2rem; | |
| } | |
| /* Sidebar Styling - Works with gr.Sidebar */ | |
| #main-sidebar { | |
| background: linear-gradient(135deg, #1e3a5f 0%, #2d5a7b 100%) !important; | |
| } | |
| #main-sidebar .sidebar-header { | |
| color: white !important; | |
| border-bottom: 2px solid rgba(255,255,255,0.2); | |
| padding-bottom: 10px; | |
| margin-bottom: 20px; | |
| } | |
| .nav-btn { | |
| width: 100%; | |
| margin-bottom: 10px; | |
| text-align: left !important; | |
| justify-content: flex-start !important; | |
| background: rgba(255,255,255,0.05) !important; | |
| border: 1px solid rgba(255,255,255,0.1) !important; | |
| color: white !important; | |
| } | |
| .nav-btn:hover { | |
| background: rgba(255,255,255,0.15) !important; | |
| border-color: rgba(255,255,255,0.3) !important; | |
| } | |
| .sidebar-header { | |
| color: white !important; | |
| border-bottom: 2px solid rgba(255,255,255,0.2); | |
| padding-bottom: 10px; | |
| margin-bottom: 20px; | |
| } | |
| /* Ensure all large buttons have consistent font size */ | |
| button.lg, | |
| .lg button { | |
| font-size: 1rem !important; | |
| font-weight: 600 !important; | |
| } | |
| /* Dark mode specific container adjustments - theme adaptive */ | |
| .dark .gradio-container { | |
| background-color: var(--body-background-fill-dark); | |
| } | |
| /* Responsive adjustments */ | |
| @media (max-width: 768px) { | |
| .gradio-container { | |
| padding: 1rem; | |
| } | |
| } | |
| /* Better spacing for input section */ | |
| .gr-group { | |
| padding: 24px; | |
| margin-bottom: 16px; | |
| } | |
| /* Format helper text styling - theme adaptive */ | |
| .format-helper { | |
| font-size: 14px; | |
| margin-top: 8px; | |
| } | |
| .dark .format-helper { | |
| color: var(--body-text-color-subdued); | |
| } | |
| /* Chart containers with proper sizing */ | |
| .plot-container { | |
| min-height: 400px; | |
| } | |
| /* Example buttons better spacing */ | |
| .gr-examples { | |
| margin-top: 16px; | |
| } | |
| /* Improved markdown sections - theme adaptive */ | |
| .dark .markdown-text { | |
| color: var(--body-text-color); | |
| } | |
| .dark .markdown-text h3 { | |
| color: var(--body-text-color); | |
| border-bottom: 1px solid var(--block-border-color); | |
| padding-bottom: 8px; | |
| } | |
| /* Hero section styling */ | |
| #hero-section { | |
| background: linear-gradient(135deg, #05478A 0%, #048CFC 50%, #11C7AA 100%); | |
| padding: 3rem 2rem; | |
| border-radius: 16px; | |
| text-align: center; | |
| margin-bottom: 2rem; | |
| color: white; | |
| } | |
| #hero-section h2 { | |
| font-size: 2.5rem; | |
| font-weight: 700; | |
| margin: 0 0 0.5rem 0; | |
| letter-spacing: -0.025em; | |
| } | |
| #hero-section p { | |
| font-size: 1.125rem; | |
| opacity: 0.95; | |
| margin: 0 0 1.5rem 0; | |
| font-weight: 400; | |
| } | |
| #hero-section .value-props { | |
| display: flex; | |
| justify-content: center; | |
| gap: 2rem; | |
| flex-wrap: wrap; | |
| margin-top: 1.5rem; | |
| } | |
| #hero-section .value-prop { | |
| display: flex; | |
| align-items: center; | |
| gap: 0.5rem; | |
| font-size: 0.95rem; | |
| font-weight: 500; | |
| } | |
| /* Compact feature cards for 1x4 layout */ | |
| .feature-card-compact { | |
| background: rgba(255, 255, 255, 0.05); | |
| backdrop-filter: blur(10px); | |
| -webkit-backdrop-filter: blur(10px); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| border-radius: 8px; | |
| padding: 1rem 0.75rem; | |
| box-shadow: 0 2px 4px rgba(0, 0, 0, 0.1); | |
| transition: transform 0.2s ease, box-shadow 0.2s ease, border-color 0.2s ease; | |
| text-align: center; | |
| min-height: 130px; | |
| display: flex; | |
| flex-direction: column; | |
| justify-content: center; | |
| } | |
| .feature-card-compact:hover { | |
| transform: translateY(-2px); | |
| box-shadow: 0 4px 8px rgba(0, 0, 0, 0.15); | |
| border-color: rgba(4, 140, 252, 0.4); | |
| } | |
| .feature-card-compact h4 { | |
| color: #048CFC; | |
| margin: 0.5rem 0 0.25rem 0; | |
| font-weight: 600; | |
| font-size: 0.9rem; | |
| } | |
| .feature-card-compact p { | |
| color: var(--body-text-color); | |
| opacity: 0.75; | |
| font-size: 0.75rem; | |
| margin: 0; | |
| line-height: 1.4; | |
| } | |
| .feature-icon-compact { | |
| font-size: 1.5rem; | |
| margin-bottom: 0.25rem; | |
| } | |
| /* Task selection cards - Light mode (default) */ | |
| .task-card { | |
| min-height: 160px; | |
| padding: 1.5rem 1rem; | |
| text-align: center; | |
| white-space: pre-line; | |
| transition: transform 0.2s ease, box-shadow 0.2s ease, border-color 0.2s ease; | |
| border-radius: 12px; | |
| background: white; | |
| border: 1px solid #e5e7eb; | |
| box-shadow: 0 2px 8px rgba(0, 0, 0, 0.08); | |
| color: #1f2937; | |
| } | |
| /* Dark mode task cards */ | |
| .dark .task-card { | |
| background: rgba(255, 255, 255, 0.05); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| box-shadow: none; | |
| color: inherit; | |
| } | |
| .task-card:hover:not(:disabled) { | |
| transform: translateY(-3px); | |
| box-shadow: 0 8px 20px rgba(0, 0, 0, 0.15); | |
| border-color: #288cfa; | |
| } | |
| .dark .task-card:hover:not(:disabled) { | |
| box-shadow: 0 6px 16px rgba(0, 0, 0, 0.2); | |
| border-color: rgba(4, 140, 252, 0.5); | |
| } | |
| .task-card:disabled { | |
| opacity: 0.5; | |
| cursor: not-allowed; | |
| } | |
| .task-card-icon { | |
| font-size: 2.5rem; | |
| margin-bottom: 0.75rem; | |
| } | |
| .task-card-title { | |
| font-size: 1.1rem; | |
| font-weight: 600; | |
| margin-bottom: 0.5rem; | |
| color: #1f2937; | |
| } | |
| .dark .task-card-title { | |
| color: inherit; | |
| } | |
| .task-card-description { | |
| font-size: 0.85rem; | |
| color: #4b5563; | |
| line-height: 1.4; | |
| } | |
| .dark .task-card-description { | |
| color: inherit; | |
| opacity: 0.8; | |
| } | |
| .coming-soon-badge { | |
| font-size: 0.7rem; | |
| background: rgba(255, 255, 255, 0.1); | |
| padding: 0.25rem 0.5rem; | |
| border-radius: 4px; | |
| margin-top: 0.5rem; | |
| display: inline-block; | |
| } | |
| /* Task page container */ | |
| .task-selection-container { | |
| max-width: 900px; | |
| margin: 0 auto; | |
| padding: 2rem; | |
| } | |
| /* Preview card - scrollable with wider layout */ | |
| .preview-card { | |
| background: rgba(255, 255, 255, 0.05); | |
| backdrop-filter: blur(10px); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| border-radius: 12px; | |
| padding: 0; | |
| flex: 1 1 auto !important; | |
| max-height: 800px !important; | |
| min-height: 0; | |
| overflow-y: auto; | |
| overflow-x: hidden; | |
| display: flex !important; | |
| flex-direction: column !important; | |
| box-sizing: border-box !important; | |
| } | |
| /* Custom scrollbar for preview card */ | |
| .preview-card::-webkit-scrollbar { | |
| width: 8px; | |
| } | |
| .preview-card::-webkit-scrollbar-track { | |
| background: rgba(255, 255, 255, 0.05); | |
| border-radius: 4px; | |
| } | |
| .preview-card::-webkit-scrollbar-thumb { | |
| background: rgba(4, 140, 252, 0.3); | |
| border-radius: 4px; | |
| } | |
| .preview-card::-webkit-scrollbar-thumb:hover { | |
| background: rgba(4, 140, 252, 0.5); | |
| } | |
| /* Firefox */ | |
| .preview-card { | |
| scrollbar-width: thin; | |
| scrollbar-color: rgba(4, 140, 252, 0.3) rgba(255, 255, 255, 0.05); | |
| } | |
| /* Scrollable markdown for bull/bear cases */ | |
| .scrollable-markdown { | |
| max-height: 400px; | |
| overflow-y: auto; | |
| padding-right: 10px; | |
| border: 1px solid rgba(255, 255, 255, 0.1); | |
| border-radius: 8px; | |
| padding: 1rem; | |
| background: rgba(255, 255, 255, 0.02); | |
| } | |
| .scrollable-markdown::-webkit-scrollbar { | |
| width: 6px; | |
| } | |
| .scrollable-markdown::-webkit-scrollbar-track { | |
| background: rgba(255, 255, 255, 0.05); | |
| border-radius: 3px; | |
| } | |
| .scrollable-markdown::-webkit-scrollbar-thumb { | |
| background: rgba(4, 140, 252, 0.3); | |
| border-radius: 3px; | |
| } | |
| /* Input card - unified with examples */ | |
| .input-card { | |
| background: rgba(255, 255, 255, 0.05); | |
| backdrop-filter: blur(10px); | |
| border: 1px solid rgba(255, 255, 255, 0.2); | |
| border-radius: 12px; | |
| padding: 1.5rem; | |
| margin-bottom: 0; | |
| flex: 1 1 auto !important; | |
| display: flex !important; | |
| flex-direction: column !important; | |
| overflow-y: visible !important; | |
| overflow-x: visible !important; | |
| max-height: none !important; | |
| min-height: 0; | |
| box-sizing: border-box !important; | |
| gap: 0 !important; | |
| } | |
| /* Custom scrollbar for input card */ | |
| .input-card::-webkit-scrollbar { | |
| width: 8px; | |
| } | |
| .input-card::-webkit-scrollbar-track { | |
| background: rgba(255, 255, 255, 0.05); | |
| border-radius: 4px; | |
| } | |
| .input-card::-webkit-scrollbar-thumb { | |
| background: rgba(4, 140, 252, 0.3); | |
| border-radius: 4px; | |
| } | |
| .input-card::-webkit-scrollbar-thumb:hover { | |
| background: rgba(4, 140, 252, 0.5); | |
| } | |
| /* Firefox scrollbar */ | |
| .input-card { | |
| scrollbar-width: thin; | |
| scrollbar-color: rgba(4, 140, 252, 0.3) rgba(255, 255, 255, 0.05); | |
| } | |
| .examples-header { | |
| margin-top: 0.25rem; | |
| margin-bottom: 0.25rem; | |
| padding-top: 0.5rem; | |
| border-top: 1px solid rgba(255, 255, 255, 0.1); | |
| color: #048CFC; | |
| font-weight: 600; | |
| font-size: 1rem; | |
| } | |
| /* Portfolio row - natural heights without stretching */ | |
| .portfolio-row { | |
| display: flex !important; | |
| flex-direction: row !important; | |
| align-items: stretch !important; | |
| height: 100% !important; | |
| min-height: 600px !important; | |
| } | |
| .portfolio-row > .gr-column { | |
| display: flex !important; | |
| flex-direction: column !important; | |
| flex: 1 1 0% !important; | |
| height: 100% !important; | |
| min-height: 100% !important; | |
| max-height: 100% !important; | |
| gap: 0 !important; | |
| overflow: visible !important; | |
| } | |
| .portfolio-row > * { | |
| flex: 1 !important; | |
| } | |
| .portfolio-row .block { | |
| display: flex !important; | |
| flex-direction: column !important; | |
| flex: 1 !important; | |
| } | |
| /* Button fixed sizing - prevent vertical expansion */ | |
| .portfolio-row button { | |
| flex: 0 0 45px !important; | |
| width: 100% !important; | |
| height: 45px !important; | |
| min-height: 45px !important; | |
| max-height: 45px !important; | |
| } | |
| /* Flex items min-height to prevent overflow */ | |
| .input-card > *:not([class*="dropdown"]), | |
| .preview-card > *:not([class*="dropdown"]) { | |
| min-height: 0; | |
| } | |
| /* Dropdown styling and z-index */ | |
| .gr-dropdown { | |
| z-index: 999 !important; | |
| position: relative !important; | |
| } | |
| .gr-dropdown-container { | |
| z-index: 999 !important; | |
| overflow: visible !important; | |
| position: relative !important; | |
| } | |
| /* Dropdown menu positioning - appear below button */ | |
| .gr-dropdown ul[role="listbox"], | |
| .gr-dropdown .options, | |
| .gr-dropdown-menu { | |
| z-index: 9999 !important; | |
| overflow: visible !important; | |
| position: absolute !important; | |
| top: 100% !important; | |
| left: 0 !important; | |
| width: 100% !important; | |
| max-height: 300px !important; | |
| overflow-y: auto !important; | |
| } | |
| /* Loading page styling */ | |
| .loading-header { | |
| animation: fadeIn 0.5s ease-in; | |
| } | |
| .loading-message { | |
| text-align: center; | |
| font-size: 1.1rem; | |
| color: #048CFC; | |
| padding: 2rem; | |
| min-height: 100px; | |
| display: flex; | |
| align-items: center; | |
| justify-content: center; | |
| } | |
| @keyframes fadeIn { | |
| from { | |
| opacity: 0; | |
| transform: translateY(-10px); | |
| } | |
| to { | |
| opacity: 1; | |
| transform: translateY(0); | |
| } | |
| } | |
| /* Example buttons styling */ | |
| .gr-examples button { | |
| background: linear-gradient(135deg, rgba(4, 140, 252, 0.1), rgba(17, 199, 170, 0.1)); | |
| border: 1px solid rgba(4, 140, 252, 0.3); | |
| border-radius: 8px; | |
| padding: 0.75rem 1rem; | |
| font-size: 0.9rem; | |
| transition: all 0.2s ease; | |
| } | |
| .gr-examples button:hover { | |
| background: linear-gradient(135deg, rgba(4, 140, 252, 0.2), rgba(17, 199, 170, 0.2)); | |
| border-color: rgba(4, 140, 252, 0.6); | |
| transform: translateY(-2px); | |
| } | |
| /* Mobile responsive - feature cards stack on small screens */ | |
| @media (max-width: 1024px) { | |
| .feature-card-compact { | |
| padding: 1.25rem; | |
| min-height: auto; | |
| } | |
| } | |
| /* Responsive adjustments */ | |
| @media (max-width: 768px) { | |
| #hero-section { | |
| padding: 2rem 1rem; | |
| } | |
| #hero-section h2 { | |
| font-size: 1.75rem; | |
| } | |
| #hero-section p { | |
| font-size: 1rem; | |
| } | |
| #hero-section .value-props { | |
| gap: 1rem; | |
| } | |
| .feature-card-compact { | |
| padding: 1.25rem; | |
| } | |
| .preview-card { | |
| max-height: 400px; | |
| } | |
| } | |
| /* Accessibility - WCAG AA Colour Contrast Improvements */ | |
| .demo-subtitle { | |
| color: rgba(255, 255, 255, 0.95) !important; /* Increased from 0.7 for better contrast */ | |
| font-size: 14px; | |
| } | |
| .disclaimer-text { | |
| color: rgba(255, 255, 255, 0.95) !important; /* Increased contrast */ | |
| font-size: 13px; | |
| } | |
| /* Update all placeholder text for better visibility */ | |
| input::placeholder, textarea::placeholder { | |
| color: rgba(255, 255, 255, 0.7) !important; /* Increased from 0.5 */ | |
| } | |
| /* Add focus indicators for keyboard navigation */ | |
| input:focus, textarea:focus, button:focus, select:focus { | |
| outline: 3px solid #4A9EFF !important; | |
| outline-offset: 2px; | |
| } | |
| /* Validation message colours with better contrast */ | |
| #signup-email-validation, #password-match-validation { | |
| font-size: 0.85rem; | |
| margin-top: 0.25rem; | |
| font-weight: 500; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="Portfolio Intelligence Platform", | |
| theme=theme, | |
| css=custom_css, | |
| fill_width=True, | |
| fill_height=True | |
| ) as demo: | |
| # Session state for authentication | |
| session_state = gr.State({}) | |
| # Pagination state for history views | |
| history_current_page = gr.State(1) | |
| history_current_page_results = gr.State(1) | |
| # Hero Section with auth status | |
| with gr.Row(): | |
| gr.HTML( | |
| """ | |
| <div id="hero-section"> | |
| <h2>Portfolio Intelligence Platform</h2> | |
| <p>AI-powered portfolio analysis with transparent multi-agent MCP orchestration</p> | |
| <div class="value-props"> | |
| <div class="value-prop"> | |
| <span>9 MCP Servers</span> | |
| </div> | |
| <div class="value-prop"> | |
| <span>Quantitative Models</span> | |
| </div> | |
| <div class="value-prop"> | |
| <span>Claude Sonnet 4.5</span> | |
| </div> | |
| </div> | |
| </div> | |
| """, | |
| elem_classes="hero-container" | |
| ) | |
| # User info and logout button | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| user_info = gr.Markdown("Not logged in", elem_id="user-info") | |
| with gr.Column(scale=1): | |
| logout_btn = gr.Button("Sign Out", variant="secondary", size="sm", visible=False) | |
| # Navigation sidebar using Gradio's native component | |
| with gr.Sidebar(position="left", open=False, visible=False, elem_id="main-sidebar") as sidebar: | |
| gr.Markdown("## Navigation", elem_classes="sidebar-header") | |
| # Navigation options | |
| nav_new_analysis = gr.Button("π Home", variant="secondary", size="lg", elem_classes="nav-btn") | |
| nav_view_history = gr.Button("π View History", variant="secondary", size="lg", elem_classes="nav-btn") | |
| gr.Markdown("---") | |
| # User section (when logged in) | |
| with gr.Group(visible=False) as sidebar_user_section: | |
| gr.Markdown("### Account") | |
| sidebar_user_email = gr.Markdown("") | |
| nav_profile_btn = gr.Button("π€ Profile", variant="secondary", size="sm") | |
| nav_signout_btn = gr.Button("πͺ Sign Out", variant="secondary", size="sm") | |
| # Authentication container | |
| with gr.Group(visible=True, elem_id="auth-container") as login_container: | |
| gr.Markdown("## π Sign In to Continue") | |
| gr.Markdown("Please sign in or create an account to analyse your portfolio") | |
| with gr.Tabs(): | |
| # Login Tab | |
| with gr.Tab("Sign In"): | |
| login_email = gr.Textbox( | |
| label="Email Address", | |
| placeholder="[email protected]", | |
| type="email", | |
| elem_id="login-email" | |
| ) | |
| login_password = gr.Textbox( | |
| label="Password", | |
| placeholder="Enter your password", | |
| type="password", | |
| elem_id="login-password" | |
| ) | |
| forgot_password_btn = gr.Button( | |
| "Forgot password?", | |
| variant="secondary", | |
| size="sm", | |
| elem_classes="forgot-password-link" | |
| ) | |
| login_btn = gr.Button("Sign In", variant="primary", size="lg") | |
| login_message = gr.Markdown("") | |
| # Signup Tab | |
| with gr.Tab("Create Account"): | |
| signup_username = gr.Textbox( | |
| label="Username (optional)", | |
| placeholder="Choose a username", | |
| elem_id="signup-username" | |
| ) | |
| signup_email = gr.Textbox( | |
| label="Email Address", | |
| placeholder="[email protected]", | |
| type="email", | |
| elem_id="signup-email" | |
| ) | |
| signup_email_validation = gr.Markdown("", elem_id="signup-email-validation") | |
| signup_password = gr.Textbox( | |
| label="Password", | |
| placeholder="At least 15 characters (NIST 2024 requirement)", | |
| type="password", | |
| elem_id="signup-password" | |
| ) | |
| signup_password_strength = gr.Markdown("", elem_id="password-strength") | |
| signup_confirm = gr.Textbox( | |
| label="Confirm Password", | |
| placeholder="Re-enter your password", | |
| type="password", | |
| elem_id="signup-confirm-password" | |
| ) | |
| signup_password_match = gr.Markdown("", elem_id="password-match-validation") | |
| gr.Markdown( | |
| "*By signing up, you agree this is for educational purposes only and not financial advice.*", | |
| elem_classes="disclaimer-text" | |
| ) | |
| signup_btn = gr.Button("Create Account", variant="primary", size="lg") | |
| signup_message = gr.Markdown("") | |
| # Demo mode option | |
| gr.Markdown("---") | |
| gr.Markdown("## π― Or Try Without Signing Up") | |
| demo_btn = gr.Button( | |
| "Try Demo", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| gr.Markdown("*1 free analysis per day β’ No account required*", elem_classes="demo-subtitle") | |
| demo_message = gr.Markdown("") | |
| # Modal 1: Request password reset (only email input) | |
| with gr.Group(visible=False, elem_id="password-reset-modal") as password_reset_modal: | |
| gr.Markdown("## Request Password Reset") | |
| gr.Markdown("Enter your email address and we'll send you a password reset link.") | |
| reset_email = gr.Textbox( | |
| label="Email Address", | |
| placeholder="[email protected]", | |
| type="email", | |
| elem_id="reset-email" | |
| ) | |
| with gr.Row(): | |
| reset_submit_btn = gr.Button("Send Reset Link", variant="primary", size="lg") | |
| reset_cancel_btn = gr.Button("Cancel", variant="secondary", size="lg") | |
| reset_message = gr.Markdown("") | |
| # Modal 2: Set new password (only appears after clicking email link) | |
| with gr.Group(visible=False, elem_id="password-update-modal") as password_update_modal: | |
| gr.Markdown("## Set New Password") | |
| gr.Markdown("Enter your new password below.") | |
| # Hidden fields to store recovery token_hash and email (populated by JavaScript) | |
| recovery_token_hash = gr.Textbox(visible=False, elem_id="recovery-token-hash") | |
| recovery_email = gr.Textbox( | |
| label="Email Address", | |
| placeholder="[email protected]", | |
| type="email", | |
| info="Enter the email address you used to request the password reset", | |
| elem_id="recovery-email" | |
| ) | |
| new_password = gr.Textbox( | |
| label="New Password", | |
| placeholder="At least 15 characters (NIST 2024 requirement)", | |
| type="password", | |
| elem_id="new-password" | |
| ) | |
| confirm_new_password = gr.Textbox( | |
| label="Confirm New Password", | |
| placeholder="Re-enter your new password", | |
| type="password", | |
| elem_id="confirm-new-password" | |
| ) | |
| with gr.Row(): | |
| update_password_btn = gr.Button("Update Password", variant="primary", size="lg") | |
| update_cancel_btn = gr.Button("Cancel", variant="secondary", size="lg") | |
| update_password_message = gr.Markdown("") | |
| # Task Selection Page (shown after authentication) | |
| with gr.Group(visible=False, elem_classes="task-selection-container") as task_page: | |
| gr.Markdown("## What would you like to do?", elem_classes="section-title") | |
| with gr.Row(equal_height=True): | |
| # Analyse Portfolio - enabled | |
| with gr.Column(scale=1, min_width=200): | |
| task_analyse_btn = gr.Button( | |
| value="π\n\nAnalyse Portfolio\n\nGet comprehensive analysis of your current holdings", | |
| elem_classes=["task-card"], | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # Build Portfolio - enabled | |
| with gr.Column(scale=1, min_width=200): | |
| task_build_btn = gr.Button( | |
| value="ποΈ\n\nBuild Portfolio\n\nAI helps construct a portfolio based on your goals", | |
| elem_classes=["task-card"], | |
| variant="primary", | |
| interactive=True, | |
| size="lg" | |
| ) | |
| # Compare Strategies - enabled | |
| with gr.Column(scale=1, min_width=200): | |
| task_compare_btn = gr.Button( | |
| value="βοΈ\n\nCompare Strategies\n\nSee bull vs bear perspectives on your investments", | |
| elem_classes=["task-card"], | |
| variant="primary", | |
| interactive=True, | |
| size="lg" | |
| ) | |
| # Test Changes | |
| with gr.Column(scale=1, min_width=200): | |
| task_test_btn = gr.Button( | |
| value="π¬\n\nTest Changes\n\nPreview impact before making portfolio changes", | |
| elem_classes=["task-card"], | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # Build Portfolio Page (shown when Build Portfolio task selected) | |
| with gr.Group(visible=False, elem_classes="build-portfolio-container") as build_page: | |
| gr.Markdown("## Build Your Portfolio", elem_classes="section-title") | |
| gr.Markdown("Tell us about your investment goals and we'll build a portfolio for you.") | |
| with gr.Row(equal_height=True): | |
| # Left column: Goal settings | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes="input-card"): | |
| build_goals = gr.CheckboxGroup( | |
| choices=["Growth", "Income", "Capital Preservation", "Speculation"], | |
| label="Investment Goals", | |
| info="Select one or more investment objectives" | |
| ) | |
| build_risk_tolerance = gr.Slider( | |
| minimum=1, | |
| maximum=10, | |
| value=5, | |
| step=1, | |
| label="Risk Tolerance", | |
| info="1 = Very Conservative, 10 = Very Aggressive" | |
| ) | |
| build_constraints = gr.Textbox( | |
| label="Constraints (Optional)", | |
| placeholder="e.g., 'No crypto, ESG focused, minimum 10 stocks, max 5% per position'", | |
| lines=3, | |
| info="Any specific requirements or restrictions" | |
| ) | |
| with gr.Row(): | |
| build_submit_btn = gr.Button( | |
| "Build My Portfolio", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| build_back_btn = gr.Button( | |
| "Back", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # Right column: Results | |
| with gr.Column(scale=3): | |
| # Agent activity stream | |
| build_agent_chat = gr.Chatbot( | |
| label="π€ Agent Activity", | |
| type="messages", | |
| height=400, | |
| visible=False, | |
| show_copy_button=True, | |
| elem_classes="agent-chat" | |
| ) | |
| with gr.Group(elem_classes="preview-card", visible=False) as build_results_container: | |
| build_status = gr.Markdown("", elem_classes="build-status") | |
| # Audio narration button and player | |
| with gr.Row(): | |
| build_audio_btn = gr.Button( | |
| "π Listen to Portfolio", | |
| variant="secondary", | |
| size="sm", | |
| visible=False | |
| ) | |
| with gr.Row(): | |
| build_audio_player = gr.Audio( | |
| label="Portfolio Summary Audio", | |
| interactive=False, | |
| visible=False, | |
| show_download_button=True | |
| ) | |
| with gr.Row(): | |
| build_regenerate_btn = gr.Button("Regenerate", variant="secondary", size="sm") | |
| # Compare Strategies Page (shown when Compare Strategies task selected) | |
| with gr.Group(visible=False, elem_classes="compare-strategies-container") as compare_page: | |
| gr.Markdown("## Compare Strategies", elem_classes="section-title") | |
| gr.Markdown("Get bull and bear perspectives on your portfolio through multi-agent debate.") | |
| with gr.Row(equal_height=True): | |
| # Left column: Portfolio input | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes="input-card"): | |
| compare_portfolio_input = gr.Textbox( | |
| placeholder="AAPL 50\nTSLA 25 shares\nNVDA $5000", | |
| label="Portfolio Holdings", | |
| lines=6, | |
| info="Enter your holdings to analyse" | |
| ) | |
| with gr.Row(): | |
| compare_submit_btn = gr.Button( | |
| "Run Analysis", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| compare_back_btn = gr.Button( | |
| "Back", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # Right column: Results | |
| with gr.Column(scale=3): | |
| # Debate activity stream | |
| compare_debate_chat = gr.Chatbot( | |
| label="π Advisory Council Debate", | |
| type="messages", | |
| height=500, | |
| visible=False, | |
| show_copy_button=True, | |
| elem_classes="debate-chat" | |
| ) | |
| with gr.Group(elem_classes="preview-card", visible=False) as compare_results_container: | |
| compare_status = gr.Markdown("", elem_classes="compare-status") | |
| # Bull vs Bear side-by-side | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Bull Case") | |
| compare_bull_case = gr.Markdown("", elem_classes="scrollable-markdown") | |
| compare_bull_confidence = gr.Number(label="Confidence %", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Bear Case") | |
| compare_bear_case = gr.Markdown("", elem_classes="scrollable-markdown") | |
| compare_bear_confidence = gr.Number(label="Confidence %", interactive=False) | |
| # Consensus | |
| gr.Markdown("### Consensus Recommendation") | |
| compare_consensus = gr.Markdown("") | |
| compare_stance = gr.Textbox(label="Stance", interactive=False) | |
| # Audio debate button and player | |
| with gr.Row(): | |
| compare_audio_btn = gr.Button( | |
| "π Listen to Debate", | |
| variant="secondary", | |
| size="sm", | |
| visible=False | |
| ) | |
| with gr.Row(): | |
| compare_audio_player = gr.Audio( | |
| label="Advisory Council Debate Audio", | |
| interactive=False, | |
| visible=False, | |
| show_download_button=True | |
| ) | |
| # Debate transcript | |
| with gr.Accordion("View Full Debate", open=False): | |
| compare_debate_transcript = gr.JSON(label="Debate Rounds") | |
| # Test Changes Page (shown when Test Changes task selected) | |
| with gr.Group(visible=False, elem_classes="test-changes-container") as test_page: | |
| gr.Markdown("## Test Portfolio Changes", elem_classes="section-title") | |
| gr.Markdown("Preview the impact of changes before making them.") | |
| with gr.Row(equal_height=True): | |
| # Left column: Portfolio and changes input | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes="input-card"): | |
| test_portfolio_input = gr.Textbox( | |
| placeholder="AAPL 30%\nTSLA 25%\nNVDA 25%\nBND 20%", | |
| label="Current Portfolio (ticker weight%)", | |
| lines=6, | |
| info="Enter your current portfolio allocations" | |
| ) | |
| test_changes_input = gr.Textbox( | |
| placeholder="sell TSLA 10\nbuy VTI 10", | |
| label="Proposed Changes", | |
| lines=4, | |
| info="Format: action ticker amount (e.g., 'sell TSLA 10' or 'buy VTI 15')" | |
| ) | |
| test_portfolio_value = gr.Number( | |
| label="Portfolio Value ($)", | |
| value=100000, | |
| info="Total portfolio value for calculations" | |
| ) | |
| with gr.Row(): | |
| test_submit_btn = gr.Button( | |
| "Run Simulation", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| test_back_btn = gr.Button( | |
| "Back", | |
| variant="secondary", | |
| size="lg" | |
| ) | |
| # Right column: Results | |
| with gr.Column(scale=3): | |
| with gr.Group(elem_classes="preview-card", visible=False) as test_results_container: | |
| test_status = gr.Markdown("", elem_classes="test-status") | |
| # Side-by-side metrics comparison | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Current Portfolio") | |
| test_current_metrics = gr.Dataframe( | |
| headers=["Metric", "Value"], | |
| label="Current Metrics", | |
| interactive=False | |
| ) | |
| with gr.Column(): | |
| gr.Markdown("### After Changes") | |
| test_simulated_metrics = gr.Dataframe( | |
| headers=["Metric", "Value"], | |
| label="Simulated Metrics", | |
| interactive=False | |
| ) | |
| # Impact summary | |
| gr.Markdown("### Impact Analysis") | |
| test_impact_summary = gr.Dataframe( | |
| headers=["Metric", "Current", "Simulated", "Change", "% Change"], | |
| label="Impact Summary", | |
| interactive=False | |
| ) | |
| # Stress test comparison | |
| with gr.Accordion("Stress Test Comparison", open=False): | |
| test_stress_comparison = gr.Dataframe( | |
| headers=["Scenario", "Shock %", "Current Loss", "Simulated Loss", "Improvement"], | |
| label="Stress Tests", | |
| interactive=False | |
| ) | |
| # Recommendations and assessment | |
| test_assessment = gr.Markdown("", elem_classes="test-assessment") | |
| test_recommendations = gr.Markdown("", label="Recommendations") | |
| # Input Page with side-by-side layout (hidden until authenticated) | |
| with gr.Group(visible=False) as input_page: | |
| # Side-by-side input and preview (2:3 ratio for wider preview) | |
| with gr.Row(equal_height=True, elem_classes="portfolio-row"): | |
| # Left column: Input + Examples (unified card) | |
| with gr.Column(scale=2): | |
| with gr.Group(elem_classes="input-card"): | |
| portfolio_input = gr.Textbox( | |
| placeholder="AAPL 50\nTSLA 25 shares\nNVDA $5000\nGLD 10 shares", | |
| label="Portfolio Holdings", | |
| lines=8, | |
| info="Enter your holdings below (see examples for format)" | |
| ) | |
| # Load past portfolio | |
| load_past_portfolio_dropdown = gr.Dropdown( | |
| choices=[], | |
| label="Load Past Portfolio (Last 3)", | |
| interactive=True, | |
| elem_id="load-past-portfolio-dropdown", | |
| info="Your 3 most recent portfolios" | |
| ) | |
| # Persona Selection | |
| persona_choices = [ | |
| ("Standard Analysis", "standard"), | |
| ("Warren Buffett - Value Investing", "warren_buffett"), | |
| ("Cathie Wood - Innovation & Growth", "cathie_wood"), | |
| ("Ray Dalio - Macro & Diversification", "ray_dalio"), | |
| ] | |
| persona_dropdown = gr.Dropdown( | |
| choices=persona_choices, | |
| value="standard", | |
| label="Analysis Persona", | |
| info="Choose an investor perspective for the analysis" | |
| ) | |
| # Roast Mode Toggle | |
| roast_mode_toggle = gr.Checkbox( | |
| label="Roast Mode", | |
| value=False, | |
| info="Enable brutal honesty mode for portfolio critique (only works with Standard Analysis)" | |
| ) | |
| # Action buttons | |
| with gr.Row(): | |
| analyse_btn = gr.Button( | |
| "Analyse Portfolio", | |
| variant="primary", | |
| size="lg", | |
| min_width=290 | |
| ) | |
| input_back_btn = gr.Button( | |
| "Back", | |
| variant="secondary", | |
| size="lg", | |
| min_width=290 | |
| ) | |
| # Examples integrated as simple buttons | |
| gr.Markdown("### Example Portfolios", elem_classes="examples-header") | |
| with gr.Row(): | |
| tech_btn = gr.Button("Tech Growth", size="sm", scale=1) | |
| conservative_btn = gr.Button("Conservative Income", size="sm", scale=1) | |
| balanced_btn = gr.Button("Balanced 60/40", size="sm", scale=1) | |
| global_btn = gr.Button("Global Diversified", size="sm", scale=1) | |
| risk_btn = gr.Button("Single Stock Risk", size="sm", scale=1) | |
| # Right column: Live preview (wider at scale=3) | |
| with gr.Column(scale=3): | |
| preview_output = gr.HTML( | |
| value=update_live_preview(""), | |
| elem_classes="preview-card" | |
| ) | |
| refresh_prices_btn = gr.Button( | |
| "π Get Current Prices", | |
| variant="primary", | |
| size="sm", | |
| scale=1 | |
| ) | |
| # JavaScript to detect password reset token_hash in URL and populate hidden fields | |
| # This runs on page load and checks for recovery token in query parameters | |
| demo.load( | |
| fn=None, # No Python processing needed - JavaScript directly sets the value | |
| inputs=[], | |
| outputs=[recovery_token_hash], | |
| js=""" | |
| function() { | |
| // Check for recovery token_hash in URL query parameters (PKCE flow) | |
| const params = new URLSearchParams(window.location.search); | |
| const tokenHash = params.get('token_hash') || ''; | |
| const typeParam = params.get('type') || ''; | |
| if (tokenHash && typeParam === 'recovery') { | |
| console.log('Password reset token detected'); | |
| // Clean up URL (remove query parameters) | |
| window.history.replaceState({}, document.title, window.location.pathname); | |
| return tokenHash; | |
| } | |
| return ''; | |
| } | |
| """ | |
| ) | |
| # Add click handlers for example buttons | |
| tech_btn.click( | |
| fn=lambda: "AAPL 50 shares\nTSLA 25 shares\nNVDA 30 shares\nMETA 20 shares", | |
| outputs=portfolio_input, | |
| show_api=False | |
| ) | |
| conservative_btn.click( | |
| fn=lambda: "VOO 100 shares\nVTI 75 shares\nSCHD 50 shares\nTLT 40 shares\nVXUS 60 shares", | |
| outputs=portfolio_input, | |
| show_api=False | |
| ) | |
| balanced_btn.click( | |
| fn=lambda: "VTI $25000\nVXUS $15000\nBND $15000\nGLD $5000", | |
| outputs=portfolio_input, | |
| show_api=False | |
| ) | |
| global_btn.click( | |
| fn=lambda: "VTI $15000\nVXUS $10000\nVWO $5000\nBND $10000\nGLD $3000\nVNQ $2000", | |
| outputs=portfolio_input, | |
| show_api=False | |
| ) | |
| risk_btn.click( | |
| fn=lambda: "TSLA 100 shares", | |
| outputs=portfolio_input, | |
| show_api=False | |
| ) | |
| # Feature highlights grid (1x4 compact layout) | |
| gr.Markdown("### Key Features", elem_classes="section-title") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=200): | |
| gr.HTML( | |
| """ | |
| <div class="feature-card-compact"> | |
| <div class="feature-icon-compact">π§ </div> | |
| <h4>Smart Analysis</h4> | |
| <p>HRP, Black-Litterman, VaR/CVaR</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Column(scale=1, min_width=200): | |
| gr.HTML( | |
| """ | |
| <div class="feature-card-compact"> | |
| <div class="feature-icon-compact">β‘</div> | |
| <h4>Fast Results</h4> | |
| <p>1-2 minutes</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Column(scale=1, min_width=200): | |
| gr.HTML( | |
| """ | |
| <div class="feature-card-compact"> | |
| <div class="feature-icon-compact">π</div> | |
| <h4>Secure</h4> | |
| <p>No data stored</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Column(scale=1, min_width=200): | |
| gr.HTML( | |
| """ | |
| <div class="feature-card-compact"> | |
| <div class="feature-icon-compact">β¨</div> | |
| <h4>AI Powered</h4> | |
| <p>Claude Sonnet 4.5</p> | |
| </div> | |
| """ | |
| ) | |
| # Loading Page | |
| with gr.Group(visible=False) as loading_page: | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.HTML( | |
| """ | |
| <div style='text-align: center; padding: 4rem 2rem;'> | |
| <div style='font-size: 3rem; margin-bottom: 1rem;'>β‘</div> | |
| <h2 style='color: #048CFC; margin-bottom: 1rem;'>Analysing Your Portfolio</h2> | |
| </div> | |
| """, | |
| elem_classes="loading-header" | |
| ) | |
| loading_message = gr.Markdown( | |
| value="Initialising analysis...", | |
| elem_classes="loading-message", | |
| ) | |
| gr.HTML( | |
| """ | |
| <div style='text-align: center; padding: 2rem; color: #999; font-size: 14px;'> | |
| <p>This may take 1-2 minutes as we run deep learning models on your portfolio</p> | |
| </div> | |
| """ | |
| ) | |
| # Results Page (tabbed interface) | |
| with gr.Group(visible=False) as results_page: | |
| # Header with actions (always visible) | |
| with gr.Row(elem_id="results-header"): | |
| with gr.Column(scale=3): | |
| gr.Markdown("# Portfolio Analysis Results", elem_classes="page-title") | |
| with gr.Column(scale=1): | |
| with gr.Row(): | |
| export_pdf_btn = gr.DownloadButton("π Export PDF", size="sm") | |
| export_csv_btn = gr.DownloadButton("π Export CSV", size="sm") | |
| # Main tabbed interface | |
| with gr.Tabs() as results_tabs: | |
| # Tab 1: Analysis Results | |
| with gr.Tab("π Analysis Results"): | |
| with gr.Column(): | |
| analysis_output = gr.Markdown("") | |
| # Audio narration button and player | |
| with gr.Row(): | |
| analysis_audio_btn = gr.Button( | |
| "π Listen to Analysis", | |
| variant="secondary", | |
| size="sm", | |
| visible=False | |
| ) | |
| with gr.Row(): | |
| analysis_audio_player = gr.Audio( | |
| label="Audio Summary", | |
| interactive=False, | |
| visible=False, | |
| show_download_button=True | |
| ) | |
| # Performance Metrics Accordion (progressive disclosure) | |
| with gr.Accordion("Performance Metrics & Reasoning", open=False): | |
| performance_metrics_output = gr.Markdown("") | |
| # Tab 2: Dashboard | |
| with gr.Tab("π Dashboard"): | |
| # Top row - Portfolio allocation + Risk metrics | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1, min_width=400): | |
| allocation_plot = gr.Plot(label="Portfolio Allocation", container=True) | |
| with gr.Column(scale=1, min_width=400): | |
| risk_plot = gr.Plot(label="Risk Metrics Dashboard", container=True) | |
| # Middle row - Performance chart | |
| with gr.Row(): | |
| performance_plot = gr.Plot(label="Historical Performance", container=True) | |
| # Bottom row - Correlation + Optimisation | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1, min_width=400): | |
| correlation_plot = gr.Plot(label="Asset Correlation Matrix", container=True) | |
| with gr.Column(scale=1, min_width=400): | |
| optimization_plot = gr.Plot(label="Optimisation Methods Comparison", container=True) | |
| # Tab 3: Tax Analysis | |
| with gr.Tab("π° Tax Analysis"): | |
| gr.Markdown("Analyse tax implications and identify tax-loss harvesting opportunities.") | |
| # Top row: Input controls | |
| with gr.Row(): | |
| tax_filing_status = gr.Dropdown( | |
| choices=[ | |
| ("Single", "single"), | |
| ("Married Filing Jointly", "married_joint"), | |
| ("Married Filing Separately", "married_separate"), | |
| ("Head of Household", "head_of_household"), | |
| ], | |
| value="single", | |
| label="Filing Status", | |
| info="Your tax filing status" | |
| ) | |
| tax_annual_income = gr.Slider( | |
| minimum=0, | |
| maximum=1000000, | |
| value=75000, | |
| step=5000, | |
| label="Annual Income ($)", | |
| info="Total taxable income" | |
| ) | |
| tax_cost_basis_method = gr.Dropdown( | |
| choices=[ | |
| ("First In, First Out (FIFO)", "fifo"), | |
| ("Last In, First Out (LIFO)", "lifo"), | |
| ("Highest In, First Out (HIFO)", "hifo"), | |
| ("Average Cost", "average"), | |
| ], | |
| value="fifo", | |
| label="Cost Basis Method", | |
| info="Method for calculating gains/losses" | |
| ) | |
| # Cost basis input section | |
| gr.Markdown("### Purchase Information") | |
| gr.Markdown( | |
| "Enter your purchase details for accurate tax calculations. " | |
| "Click 'Load Holdings' to pre-populate with current holdings, then fill in purchase prices and dates." | |
| ) | |
| with gr.Row(): | |
| tax_load_holdings_btn = gr.Button("Load Holdings", size="sm") | |
| gr.Markdown( | |
| "*Accepted date formats: 9/3/17, Sep 3, 2017, 2017-09-03, etc.*", | |
| elem_classes="text-muted" | |
| ) | |
| tax_cost_basis_input = gr.Dataframe( | |
| headers=["Ticker", "Shares", "Purchase Price", "Purchase Date"], | |
| datatype=["str", "number", "number", "str"], | |
| col_count=(4, "fixed"), | |
| row_count=(5, "dynamic"), | |
| interactive=True, | |
| label="Cost Basis Information", | |
| value=[], | |
| ) | |
| tax_calculate_btn = gr.Button("Calculate Tax Impact", variant="primary") | |
| # Bottom row: Analysis output | |
| tax_analysis_output = gr.Markdown("") | |
| # Tab 4: Stress Testing | |
| with gr.Tab("π² Stress Testing"): | |
| gr.Markdown("Test your portfolio's resilience against historical crises and market scenarios.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| scenario_choices = [ | |
| ("Monte Carlo Simulation (10,000 paths)", "monte_carlo"), | |
| ("All Scenarios Comparison", "all_scenarios"), | |
| ("2008 Financial Crisis", "2008_financial_crisis"), | |
| ("COVID-19 Pandemic (2020)", "covid_2020"), | |
| ("Dot-com Bubble (2000-2002)", "dotcom_bubble"), | |
| ("European Debt Crisis (2011)", "european_debt_2011"), | |
| ("China Devaluation (2015)", "china_2015"), | |
| ("Inflation Shock (2022)", "inflation_2022"), | |
| ("Severe Recession (Hypothetical)", "severe_recession"), | |
| ("Stagflation (Hypothetical)", "stagflation"), | |
| ("Flash Crash (Hypothetical)", "flash_crash"), | |
| ] | |
| stress_scenario_dropdown = gr.Dropdown( | |
| choices=scenario_choices, | |
| value="monte_carlo", | |
| label="Select Stress Scenario", | |
| info="Choose a historical crisis or simulation method" | |
| ) | |
| with gr.Row(): | |
| stress_n_sims = gr.Slider( | |
| minimum=1000, | |
| maximum=50000, | |
| value=10000, | |
| step=1000, | |
| label="Monte Carlo Simulations", | |
| info="More simulations = more accuracy (slower)" | |
| ) | |
| stress_horizon = gr.Slider( | |
| minimum=30, | |
| maximum=756, | |
| value=252, | |
| step=30, | |
| label="Time Horizon (days)", | |
| info="1 year = 252 trading days" | |
| ) | |
| stress_test_btn = gr.Button("Run Stress Test", variant="primary", size="lg") | |
| # Stress test results | |
| with gr.Column(): | |
| stress_summary = gr.Markdown("", visible=True) | |
| with gr.Tabs(): | |
| with gr.Tab("Dashboard"): | |
| stress_dashboard_plot = gr.Plot(label="Stress Test Dashboard") | |
| with gr.Tab("Monte Carlo Paths"): | |
| stress_mc_plot = gr.Plot(label="Monte Carlo Simulation Paths") | |
| with gr.Tab("Scenario Comparison"): | |
| stress_scenario_plot = gr.Plot(label="All Scenarios Comparison") | |
| with gr.Tab("Drawdown Analysis"): | |
| stress_drawdown_plot = gr.Plot(label="Maximum Drawdown Distribution") | |
| # Tab 5: History (embedded) | |
| with gr.Tab("π History"): | |
| gr.Markdown("View your previous portfolio analyses") | |
| # Search and filters | |
| with gr.Row(): | |
| history_search_results = gr.Textbox( | |
| placeholder="Search by ticker, date...", | |
| label="Search", | |
| scale=3, | |
| elem_id="history-search-results" | |
| ) | |
| history_date_filter_results = gr.Dropdown( | |
| choices=[ | |
| ("All Time", "all"), | |
| ("Last 7 Days", "7d"), | |
| ("Last 30 Days", "30d"), | |
| ("Last 90 Days", "90d"), | |
| ], | |
| value="all", | |
| label="Date Range", | |
| scale=1 | |
| ) | |
| history_refresh_btn_results = gr.Button("π Refresh", size="sm", scale=1) | |
| history_table_results = gr.Dataframe( | |
| headers=["ID", "Date", "Holdings", "Risk Tolerance", "AI Synthesis Preview"], | |
| datatype=["str", "str", "str", "str", "str"], | |
| interactive=False, | |
| wrap=True, | |
| elem_id="history-table-results", | |
| column_widths=["5%", "15%", "25%", "15%", "40%"] | |
| ) | |
| # Pagination controls | |
| with gr.Row(): | |
| history_prev_btn_results = gr.Button("β Previous", size="sm", scale=1) | |
| history_page_info_results = gr.Markdown("Page 1 of 1", elem_id="history-page-info-results") | |
| history_next_btn_results = gr.Button("Next β", size="sm", scale=1) | |
| with gr.Accordion("Selected Analysis Details", open=False): | |
| history_details_output_results = gr.Markdown("") | |
| # History Page (Enhancement #4 - Historical Analysis Storage) | |
| with gr.Group(visible=False) as history_page: | |
| gr.Markdown("### Analysis History") | |
| gr.Markdown("View your previous portfolio analyses") | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| # Search and filters | |
| with gr.Row(): | |
| history_search = gr.Textbox( | |
| placeholder="Search by ticker, date...", | |
| label="Search", | |
| scale=3, | |
| elem_id="history-search" | |
| ) | |
| history_date_filter = gr.Dropdown( | |
| choices=[ | |
| ("All Time", "all"), | |
| ("Last 7 Days", "7d"), | |
| ("Last 30 Days", "30d"), | |
| ("Last 90 Days", "90d"), | |
| ], | |
| value="all", | |
| label="Date Range", | |
| scale=1 | |
| ) | |
| history_refresh_btn = gr.Button("π Refresh", size="sm", scale=1) | |
| with gr.Column(scale=1): | |
| back_to_input_btn = gr.Button("New Analysis", variant="secondary") | |
| history_table = gr.Dataframe( | |
| headers=["ID", "Date", "Holdings", "Risk Tolerance", "AI Synthesis Preview"], | |
| datatype=["str", "str", "str", "str", "str"], | |
| interactive=False, | |
| wrap=True, | |
| elem_id="history-table", | |
| column_widths=["5%", "15%", "25%", "15%", "40%"] | |
| ) | |
| # Pagination controls | |
| with gr.Row(): | |
| history_prev_btn = gr.Button("β Previous", size="sm", scale=1) | |
| history_page_info = gr.Markdown("Page 1 of 1", elem_id="history-page-info") | |
| history_next_btn = gr.Button("Next β", size="sm", scale=1) | |
| with gr.Accordion("Selected Analysis Details", open=False) as history_details: | |
| history_details_output = gr.Markdown("") | |
| view_history_btn = gr.Button("View Analysis History", variant="secondary", visible=False) | |
| # Event handlers | |
| def show_input_page(): | |
| return { | |
| task_page: gr.update(visible=False), | |
| input_page: gr.update(visible=True), | |
| results_page: gr.update(visible=False), | |
| history_page: gr.update(visible=False), | |
| build_page: gr.update(visible=False), | |
| compare_page: gr.update(visible=False), | |
| test_page: gr.update(visible=False) | |
| } | |
| def show_task_page(): | |
| return { | |
| task_page: gr.update(visible=True), | |
| input_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| history_page: gr.update(visible=False), | |
| build_page: gr.update(visible=False), | |
| compare_page: gr.update(visible=False), | |
| test_page: gr.update(visible=False) | |
| } | |
| def show_build_page(): | |
| return { | |
| task_page: gr.update(visible=False), | |
| input_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| history_page: gr.update(visible=False), | |
| build_page: gr.update(visible=True), | |
| compare_page: gr.update(visible=False), | |
| test_page: gr.update(visible=False) | |
| } | |
| def show_compare_page(): | |
| return { | |
| task_page: gr.update(visible=False), | |
| input_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| history_page: gr.update(visible=False), | |
| build_page: gr.update(visible=False), | |
| compare_page: gr.update(visible=True), | |
| test_page: gr.update(visible=False) | |
| } | |
| def show_test_page(): | |
| return { | |
| task_page: gr.update(visible=False), | |
| input_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| history_page: gr.update(visible=False), | |
| build_page: gr.update(visible=False), | |
| compare_page: gr.update(visible=False), | |
| test_page: gr.update(visible=True) | |
| } | |
| # ============================================================ | |
| # AUDIO GENERATION HANDLERS | |
| # ============================================================ | |
| async def generate_analysis_audio(): | |
| """Generate audio narration for portfolio analysis on-demand.""" | |
| global LAST_ANALYSIS_STATE | |
| if not LAST_ANALYSIS_STATE: | |
| logger.warning("No analysis state available for audio generation") | |
| return ( | |
| gr.update(visible=False), # audio_player | |
| gr.update(visible=True), # button | |
| ) | |
| try: | |
| from backend.audio.tts_service import TTSService | |
| tts = TTSService() | |
| if not tts.is_available(): | |
| logger.warning("TTS service not available") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| # Extract analysis text | |
| ai_synthesis = LAST_ANALYSIS_STATE.get("ai_synthesis", "") | |
| recommendations = LAST_ANALYSIS_STATE.get("recommendations", []) | |
| if not ai_synthesis and not recommendations: | |
| logger.warning("No analysis content available") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| logger.info("Generating audio for analysis...") | |
| # Generate audio | |
| audio_path = await tts.generate_analysis_narration( | |
| analysis_text=ai_synthesis[:1000], # Limit to 1000 chars | |
| recommendations=recommendations | |
| ) | |
| logger.info(f"Audio generated: {audio_path}") | |
| return ( | |
| gr.update(value=audio_path, visible=True), # Show audio player | |
| gr.update(visible=True), # Keep button visible | |
| ) | |
| except Exception as e: | |
| logger.error(f"Audio generation failed: {e}") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| async def generate_build_audio(): | |
| """Generate audio narration for built portfolio on-demand.""" | |
| global LAST_BUILD_RESULT | |
| if not LAST_BUILD_RESULT: | |
| logger.warning("No build result available for audio generation") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| try: | |
| from backend.audio.tts_service import TTSService | |
| tts = TTSService() | |
| if not tts.is_available(): | |
| logger.warning("TTS service not available") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| # Extract portfolio summary | |
| portfolio_summary = LAST_BUILD_RESULT.get("summary", "") | |
| holdings = LAST_BUILD_RESULT.get("holdings", []) | |
| if not portfolio_summary: | |
| logger.warning("No portfolio summary available") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| logger.info("Generating audio for portfolio...") | |
| # Generate audio | |
| audio_path = await tts.generate_portfolio_narration( | |
| portfolio_summary=portfolio_summary[:1000], | |
| holdings=holdings | |
| ) | |
| logger.info(f"Audio generated: {audio_path}") | |
| return ( | |
| gr.update(value=audio_path, visible=True), | |
| gr.update(visible=True), | |
| ) | |
| except Exception as e: | |
| logger.error(f"Audio generation failed: {e}") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| async def generate_debate_audio(): | |
| """Generate multi-speaker debate audio on-demand.""" | |
| global LAST_DEBATE_DATA | |
| logger.info(f"generate_debate_audio called. LAST_DEBATE_DATA exists: {LAST_DEBATE_DATA is not None}") | |
| if LAST_DEBATE_DATA: | |
| logger.info(f"Debate data keys: {LAST_DEBATE_DATA.keys()}") | |
| logger.info(f"Bull case length: {len(LAST_DEBATE_DATA.get('bull_case', ''))}") | |
| logger.info(f"Bear case length: {len(LAST_DEBATE_DATA.get('bear_case', ''))}") | |
| logger.info(f"Consensus length: {len(LAST_DEBATE_DATA.get('consensus', ''))}") | |
| if not LAST_DEBATE_DATA: | |
| logger.warning("No debate data available for audio generation") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| try: | |
| from backend.audio.tts_service import DebateAudioGenerator | |
| debate_gen = DebateAudioGenerator() | |
| if not debate_gen.is_available(): | |
| logger.warning("Debate audio generator not available") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| # Extract debate data | |
| bull_case = LAST_DEBATE_DATA.get("bull_case", "") | |
| bear_case = LAST_DEBATE_DATA.get("bear_case", "") | |
| consensus = LAST_DEBATE_DATA.get("consensus", "") | |
| bull_confidence = LAST_DEBATE_DATA.get("bull_confidence") | |
| bear_confidence = LAST_DEBATE_DATA.get("bear_confidence") | |
| stance = LAST_DEBATE_DATA.get("stance") | |
| if not bull_case or not bear_case or not consensus: | |
| logger.warning("Incomplete debate data") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| logger.info("Generating debate audio...") | |
| # Generate multi-speaker debate audio | |
| audio_path = await debate_gen.generate_debate_audio( | |
| bull_case=bull_case[:1000], | |
| bear_case=bear_case[:1000], | |
| consensus=consensus[:1000], | |
| bull_confidence=bull_confidence, | |
| bear_confidence=bear_confidence, | |
| stance=stance | |
| ) | |
| logger.info(f"Debate audio generated: {audio_path}") | |
| logger.info(f"Audio file exists: {os.path.exists(audio_path) if audio_path else False}") | |
| logger.info(f"Returning audio player update with visible=True") | |
| return ( | |
| gr.update(value=audio_path, visible=True), | |
| gr.update(visible=True), | |
| ) | |
| except Exception as e: | |
| logger.error(f"Debate audio generation failed: {e}") | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| ) | |
| async def handle_build_portfolio(goals, risk_tolerance, constraints, session_state): | |
| """Handle the Build Portfolio workflow with streaming updates. | |
| Args: | |
| goals: List of selected investment goals | |
| risk_tolerance: Risk tolerance score 1-10 | |
| constraints: User constraints text | |
| session_state: User session | |
| Yields: | |
| Tuple of UI updates: (agent_chat, results_container, status) | |
| """ | |
| global LAST_BUILD_RESULT | |
| logger.info(f"handle_build_portfolio called") | |
| logger.info(f"Input types - goals: {type(goals).__name__}, risk_tolerance: {type(risk_tolerance).__name__}, constraints: {type(constraints).__name__}") | |
| logger.info(f"Input values - goals: {goals!r}, risk_tolerance: {risk_tolerance!r}, constraints: {constraints!r}") | |
| if not goals: | |
| logger.warning(f"No goals provided") | |
| yield ( | |
| gr.update(value=[], visible=False), # build_agent_chat (empty, hidden) | |
| gr.update(visible=True), # build_results_container | |
| "Please select at least one investment goal.", # build_status | |
| gr.update(visible=False) # build_audio_btn (hide on error) | |
| ) | |
| return | |
| try: | |
| logger.info(f"Initialising MCP and workflow routers") | |
| # Initialise MCP router and workflow router | |
| from backend.mcp_router import MCPRouter | |
| from backend.agents.workflow_router import WorkflowRouter | |
| mcp_router = MCPRouter() | |
| workflow_router = WorkflowRouter(mcp_router) | |
| # Normalise input types | |
| goals_list = goals if isinstance(goals, list) else [goals] if goals else [] | |
| constraints_str = ", ".join(constraints) if isinstance(constraints, list) else (constraints or "") | |
| logger.info(f"After normalisation - goals_list: {goals_list!r} (type: {type(goals_list).__name__})") | |
| logger.info(f"After normalisation - constraints_str: {constraints_str!r} (type: {type(constraints_str).__name__})") | |
| # Stream the build workflow | |
| chat_messages = [] | |
| message_count = 0 | |
| logger.info(f"Starting stream from route_build_stream") | |
| async for message in workflow_router.route_build_stream( | |
| goals=goals_list, | |
| risk_tolerance=int(risk_tolerance), | |
| constraints=constraints_str | |
| ): | |
| try: | |
| message_count += 1 | |
| logger.debug(f"Received message #{message_count} from stream") | |
| logger.debug(f"Message type: {type(message).__name__}") | |
| logger.debug(f"Message keys: {message.keys() if isinstance(message, dict) else 'N/A'}") | |
| logger.debug(f"Message content type: {type(message.get('content')).__name__ if isinstance(message, dict) else 'N/A'}") | |
| logger.debug(f"Message: {message!r}") | |
| chat_messages.append(message) | |
| logger.debug(f"Message appended to chat_messages. Total messages: {len(chat_messages)}") | |
| try: | |
| # Stream each message to the chatbot in real-time | |
| logger.debug(f"About to yield UI update with {len(chat_messages)} messages") | |
| yield ( | |
| gr.update(value=chat_messages, visible=True), # build_agent_chat (visible, growing list) | |
| gr.update(visible=False), # build_results_container (hidden during streaming) | |
| "", # build_status | |
| gr.update(visible=False) # build_audio_btn (hide during streaming) | |
| ) | |
| logger.debug(f"UI update yielded successfully") | |
| except Exception as e: | |
| logger.error(f"Error yielding UI update: {e}") | |
| logger.error(f"chat_messages type: {type(chat_messages).__name__}") | |
| logger.error(f"chat_messages length: {len(chat_messages)}") | |
| logger.error(f"Last message: {chat_messages[-1] if chat_messages else 'N/A'}") | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error processing message #{message_count}") | |
| logger.error(f"Message: {message!r}") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| logger.error(f"Error message: {str(e)}") | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| raise | |
| # Store build result for audio generation | |
| if chat_messages: | |
| final_message = chat_messages[-1] | |
| if isinstance(final_message, dict) and "metadata" in final_message: | |
| portfolio_data = final_message.get("metadata", {}).get("portfolio", {}) | |
| # Handle both dict and list formats | |
| if isinstance(portfolio_data, list): | |
| holdings = portfolio_data | |
| elif isinstance(portfolio_data, dict): | |
| holdings = portfolio_data.get("holdings", []) | |
| else: | |
| holdings = [] | |
| LAST_BUILD_RESULT = { | |
| "summary": final_message.get("content", ""), | |
| "holdings": holdings, | |
| "reasoning": final_message.get("metadata", {}).get("reasoning_trace", []) | |
| } | |
| logger.info("Build result stored for audio generation") | |
| # Final yield: Show results container | |
| logger.info(f"Completed streaming. Yielding final result with {len(chat_messages)} messages") | |
| yield ( | |
| gr.update(value=chat_messages, visible=True), # build_agent_chat (final state, visible) | |
| gr.update(visible=True), # build_results_container (show results) | |
| "Portfolio built successfully!", # build_status | |
| gr.update(visible=True) # build_audio_btn (show audio button) | |
| ) | |
| except Exception as e: | |
| logger.error(f"=== BUILD PORTFOLIO ERROR ===") | |
| logger.error(f"Error type: {type(e).__name__}") | |
| logger.error(f"Error message: {str(e)}") | |
| # Extract detailed traceback information | |
| import traceback | |
| import sys | |
| tb = sys.exc_info()[2] | |
| if tb: | |
| tb_list = traceback.extract_tb(tb) | |
| for frame_summary in tb_list: | |
| logger.error(f" File: {frame_summary.filename}") | |
| logger.error(f" Function: {frame_summary.name}") | |
| logger.error(f" Line {frame_summary.lineno}: {frame_summary.line}") | |
| logger.error(f"Full traceback:\n{traceback.format_exc()}") | |
| logger.error(f"message_count at error: {message_count if 'message_count' in locals() else 'N/A'}") | |
| logger.error(f"chat_messages at error: {len(chat_messages) if 'chat_messages' in locals() else 'N/A'} messages") | |
| logger.error(f"=== END BUILD PORTFOLIO ERROR ===") | |
| yield ( | |
| gr.update(value=[], visible=False), # build_agent_chat (empty, hidden on error) | |
| gr.update(visible=True), # build_results_container | |
| f"Error building portfolio: {str(e)}", # build_status | |
| gr.update(visible=False) # build_audio_btn (hide on error) | |
| ) | |
| def handle_build_accept(portfolio_table): | |
| """Accept built portfolio and populate input for analysis. | |
| Args: | |
| portfolio_table: DataFrame with ticker and allocation data | |
| Returns: | |
| Tuple of values for each output component | |
| """ | |
| if portfolio_table is None or len(portfolio_table) == 0: | |
| return ( | |
| "", # portfolio_input | |
| gr.update(visible=False), # task_page | |
| gr.update(visible=True), # input_page | |
| gr.update(visible=False), # results_page | |
| gr.update(visible=False), # history_page | |
| gr.update(visible=False), # build_page | |
| gr.update(visible=False), # compare_page | |
| gr.update(visible=False) # test_page | |
| ) | |
| # Convert portfolio table to input format | |
| lines = [] | |
| for row in portfolio_table: | |
| ticker = row[0] if len(row) > 0 else "" | |
| allocation = row[1] if len(row) > 1 else 0 | |
| if ticker: | |
| lines.append(f"{ticker} {allocation}%") | |
| portfolio_text = "\n".join(lines) | |
| return ( | |
| portfolio_text, # portfolio_input | |
| gr.update(visible=False), # task_page | |
| gr.update(visible=True), # input_page | |
| gr.update(visible=False), # results_page | |
| gr.update(visible=False), # history_page | |
| gr.update(visible=False), # build_page | |
| gr.update(visible=False), # compare_page | |
| gr.update(visible=False) # test_page | |
| ) | |
| async def handle_compare_portfolio(portfolio_text, session_state): | |
| """Handle the Compare Strategies workflow with streaming debate. | |
| Args: | |
| portfolio_text: Raw portfolio text input | |
| session_state: User session | |
| Yields: | |
| Tuple of UI updates: (debate_chat, results_container, status, bull_case, bull_conf, | |
| bear_case, bear_conf, consensus, stance, debate_transcript) | |
| """ | |
| global LAST_DEBATE_DATA | |
| if not portfolio_text or not portfolio_text.strip(): | |
| yield ( | |
| gr.update(value=[], visible=False), # compare_debate_chat (empty, hidden) | |
| gr.update(visible=True), # compare_results_container | |
| "Please enter your portfolio holdings.", # compare_status | |
| "", # compare_bull_case | |
| 0, # compare_bull_confidence | |
| "", # compare_bear_case | |
| 0, # compare_bear_confidence | |
| "", # compare_consensus | |
| "", # compare_stance | |
| [], # compare_debate_transcript | |
| gr.update(visible=False) # compare_audio_btn (hide on error) | |
| ) | |
| return | |
| # Parse portfolio | |
| holdings = parse_portfolio_input(portfolio_text) | |
| if not holdings: | |
| yield ( | |
| gr.update(value=[], visible=False), # compare_debate_chat (empty, hidden) | |
| gr.update(visible=True), # compare_results_container | |
| "Could not parse portfolio. Please check format.", # compare_status | |
| "", 0, "", 0, "", "", [], | |
| gr.update(visible=False) # compare_audio_btn (hide on error) | |
| ) | |
| return | |
| try: | |
| # Initialise MCP router and workflow router | |
| from backend.mcp_router import MCPRouter | |
| from backend.agents.workflow_router import WorkflowRouter | |
| mcp_router = MCPRouter() | |
| workflow_router = WorkflowRouter(mcp_router) | |
| # Stream the compare workflow | |
| chat_messages = [] | |
| bull_case_data = {} | |
| bear_case_data = {} | |
| consensus_data = {} | |
| debate_transcript = [] | |
| async for message in workflow_router.route_compare_stream(holdings=holdings): | |
| chat_messages.append(message) | |
| # Stream each message to the chatbot in real-time | |
| yield ( | |
| gr.update(value=chat_messages, visible=True), # compare_debate_chat (visible, growing list) | |
| gr.update(visible=False), # compare_results_container (hidden during streaming) | |
| "", # compare_status | |
| "", # compare_bull_case | |
| 0, # compare_bull_confidence | |
| "", # compare_bear_case | |
| 0, # compare_bear_confidence | |
| "", # compare_consensus | |
| "", # compare_stance | |
| [], # compare_debate_transcript | |
| gr.update(visible=False) # compare_audio_btn (hide during streaming) | |
| ) | |
| # Extract data from final consensus message | |
| metadata = message.get("metadata", {}) | |
| if metadata and "Consensus" in metadata.get("title", ""): | |
| # This is the final consensus - parse it | |
| content = message.get("content", "") | |
| # Store for final display | |
| consensus_data = {"recommendation": content, "stance": "Mixed"} | |
| # Extract bull/bear from previous messages | |
| for msg in chat_messages: | |
| msg_meta = msg.get("metadata", {}) | |
| title = msg_meta.get("title", "") | |
| if "Bull Researcher" in title: | |
| bull_case_data = { | |
| "thesis": msg.get("content", ""), | |
| "confidence": 65 # Default, could parse from title | |
| } | |
| elif "Bear Researcher" in title: | |
| bear_case_data = { | |
| "thesis": msg.get("content", ""), | |
| "confidence": 60 # Default, could parse from title | |
| } | |
| # Store debate data for audio generation | |
| if consensus_data and bull_case_data and bear_case_data: | |
| LAST_DEBATE_DATA = { | |
| "bull_case": bull_case_data.get("thesis", ""), | |
| "bear_case": bear_case_data.get("thesis", ""), | |
| "consensus": consensus_data.get("recommendation", ""), | |
| "bull_confidence": bull_case_data.get("confidence"), | |
| "bear_confidence": bear_case_data.get("confidence"), | |
| "stance": consensus_data.get("stance", "Mixed") | |
| } | |
| logger.info("Debate data stored for audio generation") | |
| # Final yield: Show results container with analysis | |
| yield ( | |
| gr.update(value=chat_messages, visible=True), # compare_debate_chat (final state, visible) | |
| gr.update(visible=True), # compare_results_container (show results) | |
| "Analysis complete!", # compare_status | |
| bull_case_data.get("thesis", ""), # compare_bull_case | |
| bull_case_data.get("confidence", 0), # compare_bull_confidence | |
| bear_case_data.get("thesis", ""), # compare_bear_case | |
| bear_case_data.get("confidence", 0), # compare_bear_confidence | |
| consensus_data.get("recommendation", ""), # compare_consensus | |
| consensus_data.get("stance", "Mixed"), # compare_stance | |
| chat_messages, # compare_debate_transcript (use chat messages) | |
| gr.update(visible=True) # compare_audio_btn (show audio button) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Compare portfolio error: {e}") | |
| yield ( | |
| gr.update(value=[], visible=False), # compare_debate_chat (empty, hidden on error) | |
| gr.update(visible=True), # compare_results_container | |
| f"Error: {str(e)}", # compare_status | |
| "", 0, "", 0, "", "", [], | |
| gr.update(visible=False) # compare_audio_btn (hide on error) | |
| ) | |
| async def handle_test_changes(portfolio_text, changes_text, portfolio_value, session_state): | |
| """Handle the Test Changes workflow. | |
| Args: | |
| portfolio_text: Current portfolio as text (ticker weight%) | |
| changes_text: Proposed changes as text | |
| portfolio_value: Total portfolio value | |
| session_state: User session | |
| Returns: | |
| Tuple of UI updates for test results | |
| """ | |
| try: | |
| if not portfolio_text.strip(): | |
| return ( | |
| gr.update(visible=True), | |
| "Please enter your current portfolio.", | |
| [], | |
| [], | |
| [], | |
| [], | |
| "", | |
| "" | |
| ) | |
| if not changes_text.strip(): | |
| return ( | |
| gr.update(visible=True), | |
| "Please enter proposed changes.", | |
| [], | |
| [], | |
| [], | |
| [], | |
| "", | |
| "" | |
| ) | |
| # Parse current portfolio | |
| current_portfolio = {} | |
| for line in portfolio_text.strip().split("\n"): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| parts = line.replace("%", "").split() | |
| if len(parts) >= 2: | |
| ticker = parts[0].upper() | |
| try: | |
| weight = float(parts[1]) | |
| current_portfolio[ticker] = {"weight": weight} | |
| except ValueError: | |
| continue | |
| # Parse proposed changes | |
| proposed_changes = [] | |
| for line in changes_text.strip().split("\n"): | |
| line = line.strip().lower() | |
| if not line: | |
| continue | |
| parts = line.split() | |
| if len(parts) >= 3: | |
| action = parts[0] | |
| ticker = parts[1].upper() | |
| try: | |
| amount = float(parts[2]) | |
| if action in ["buy", "sell"]: | |
| proposed_changes.append({ | |
| "action": action, | |
| "ticker": ticker, | |
| "amount": amount | |
| }) | |
| except ValueError: | |
| continue | |
| if not current_portfolio: | |
| return ( | |
| gr.update(visible=True), | |
| "Could not parse portfolio. Use format: TICKER WEIGHT%", | |
| [], | |
| [], | |
| [], | |
| [], | |
| "", | |
| "" | |
| ) | |
| if not proposed_changes: | |
| return ( | |
| gr.update(visible=True), | |
| "Could not parse changes. Use format: buy/sell TICKER AMOUNT", | |
| [], | |
| [], | |
| [], | |
| [], | |
| "", | |
| "" | |
| ) | |
| # Initialise MCP router and workflow router | |
| from backend.mcp_router import MCPRouter | |
| from backend.agents.workflow_router import WorkflowRouter | |
| mcp_router = MCPRouter() | |
| workflow_router = WorkflowRouter(mcp_router) | |
| # Run the test workflow | |
| result = await workflow_router.route_test( | |
| current_portfolio=current_portfolio, | |
| proposed_changes=proposed_changes, | |
| portfolio_value=float(portfolio_value) | |
| ) | |
| # Format current metrics for display | |
| current_metrics_data = [] | |
| for metric, value in result.get("current", {}).get("metrics", {}).items(): | |
| if isinstance(value, (int, float)): | |
| current_metrics_data.append([metric.replace("_", " ").title(), f"{value:.4f}"]) | |
| # Format simulated metrics for display | |
| simulated_metrics_data = [] | |
| for metric, value in result.get("simulated", {}).get("metrics", {}).items(): | |
| if isinstance(value, (int, float)): | |
| simulated_metrics_data.append([metric.replace("_", " ").title(), f"{value:.4f}"]) | |
| # Format impact summary | |
| impact_data = [] | |
| for metric, impact in result.get("impact", {}).items(): | |
| impact_data.append([ | |
| metric.replace("_", " ").title(), | |
| f"{impact.get('current', 0):.4f}", | |
| f"{impact.get('simulated', 0):.4f}", | |
| f"{impact.get('delta', 0):+.4f}", | |
| f"{impact.get('pct_change', 0):+.2f}%" | |
| ]) | |
| # Format stress test comparison | |
| stress_data = [] | |
| for scenario, data in result.get("stress_comparison", {}).items(): | |
| stress_data.append([ | |
| scenario.replace("_", " ").title(), | |
| f"{data.get('shock', 0):.0f}%", | |
| f"${data.get('current_loss', 0):,.0f}", | |
| f"${data.get('simulated_loss', 0):,.0f}", | |
| f"${data.get('improvement', 0):+,.0f}" | |
| ]) | |
| # Format recommendations | |
| recommendations_text = "### Recommendations\n\n" | |
| for rec in result.get("recommendations", []): | |
| recommendations_text += f"- {rec}\n" | |
| # Format assessment | |
| assessment_text = f"### Overall Assessment\n\n**{result.get('overall_assessment', 'No assessment')}**" | |
| return ( | |
| gr.update(visible=True), | |
| "Simulation complete!", | |
| current_metrics_data, | |
| simulated_metrics_data, | |
| impact_data, | |
| stress_data, | |
| assessment_text, | |
| recommendations_text | |
| ) | |
| except Exception as e: | |
| logger.error(f"Test changes error: {e}") | |
| return ( | |
| gr.update(visible=True), | |
| f"Error running simulation: {str(e)}", | |
| [], | |
| [], | |
| [], | |
| [], | |
| "", | |
| "" | |
| ) | |
| def sync_handle_test_changes(portfolio_text, changes_text, portfolio_value, session_state): | |
| """Synchronous wrapper for handle_test_changes.""" | |
| return asyncio.run(handle_test_changes(portfolio_text, changes_text, portfolio_value, session_state)) | |
| async def load_history(session_state): | |
| """Load analysis history from database. | |
| Demo users do not have persistent history - their analyses are ephemeral | |
| and only exist for the current session. | |
| """ | |
| try: | |
| is_demo = session_state.get("is_demo", False) if session_state else False | |
| # Demo users don't have persistent history | |
| if is_demo: | |
| logger.info("Demo mode: No persistent history available (analyses are ephemeral)") | |
| return [], "Demo mode does not save history. Sign up for a free account to save your analyses!" | |
| session = UserSession.from_dict(session_state) | |
| # Must be authenticated user with valid user_id | |
| if not session or not session.user_id: | |
| logger.error("No valid user_id for loading history") | |
| return [], "Please sign in to view your analysis history" | |
| user_id = session.user_id | |
| logger.info(f"Loading history for user: {user_id}") | |
| history = await db.get_analysis_history(user_id, limit=20) | |
| if not history: | |
| logger.info(f"No history found for user {user_id}") | |
| return [], "No previous analyses found" | |
| logger.info(f"Loaded {len(history)} analyses for user {user_id}") | |
| # Format history for dataframe | |
| # Store analysis records globally for row selection | |
| global HISTORY_RECORDS | |
| HISTORY_RECORDS = history | |
| rows = [] | |
| for i, record in enumerate(history): | |
| # Format holdings as comma-separated tickers | |
| holdings_str = ", ".join([h.get("ticker", "?") for h in record.get("holdings_snapshot", [])]) | |
| # Truncate AI synthesis for preview | |
| synthesis_preview = record.get("ai_synthesis", "") | |
| rows.append([ | |
| str(i), # Row index for selection | |
| record.get("created_at", "").split("T")[0], # Date only | |
| holdings_str, | |
| record.get("portfolios", {}).get("risk_tolerance", "moderate"), | |
| synthesis_preview | |
| ]) | |
| return rows, f"Loaded {len(history)} previous analyses" | |
| except Exception as e: | |
| logger.error(f"Failed to load history: {e}") | |
| return [], f"Error loading history: {str(e)}" | |
| def sync_load_history(session_state): | |
| """Synchronous wrapper for load_history.""" | |
| return asyncio.run(load_history(session_state)) | |
| async def filter_history(session_state, search_query: str, date_filter: str, page: int = 1, page_size: int = 10): | |
| """Filter and paginate history based on search query and date range. | |
| Args: | |
| session_state: User session dictionary | |
| search_query: Search string to filter by ticker or date | |
| date_filter: Date range filter ("all", "7d", "30d", "90d") | |
| page: Current page number (1-indexed) | |
| page_size: Number of items per page | |
| Returns: | |
| Tuple of (filtered_rows, page_info_message) | |
| """ | |
| from datetime import datetime, timedelta | |
| try: | |
| is_demo = session_state.get("is_demo", False) if session_state else False | |
| if is_demo: | |
| return [], "Page 1 of 1" | |
| session = UserSession.from_dict(session_state) | |
| if not session or not session.user_id: | |
| return [], "Page 1 of 1" | |
| user_id = session.user_id | |
| # Load full history (increased limit for filtering) | |
| full_history = await db.get_analysis_history(user_id, limit=100) | |
| if not full_history: | |
| return [], "Page 1 of 1" | |
| # Apply date filter | |
| filtered_history = [] | |
| now = datetime.now() | |
| cutoff = None | |
| if date_filter == "7d": | |
| cutoff = now - timedelta(days=7) | |
| elif date_filter == "30d": | |
| cutoff = now - timedelta(days=30) | |
| elif date_filter == "90d": | |
| cutoff = now - timedelta(days=90) | |
| for record in full_history: | |
| # Apply date filter | |
| if cutoff: | |
| try: | |
| created_at_str = record.get('created_at', '') | |
| if created_at_str: | |
| # Handle both ISO format with and without timezone | |
| if 'T' in created_at_str: | |
| created_at_str = created_at_str.split('.')[0] # Remove microseconds | |
| created_at_str = created_at_str.replace('Z', '') # Remove timezone | |
| analysis_date = datetime.fromisoformat(created_at_str) | |
| else: | |
| analysis_date = datetime.fromisoformat(created_at_str) | |
| if analysis_date < cutoff: | |
| continue | |
| except Exception as e: | |
| logger.warning(f"Date parsing error: {e}") | |
| continue | |
| # Apply search filter | |
| if search_query and search_query.strip(): | |
| query_lower = search_query.lower().strip() | |
| # Search in tickers | |
| holdings = record.get('holdings_snapshot', []) | |
| tickers = ' '.join([h.get('ticker', '') for h in holdings]).lower() | |
| # Search in date | |
| date_str = record.get('created_at', '').lower() | |
| # Search in risk tolerance | |
| risk = record.get('portfolios', {}).get('risk_tolerance', '').lower() | |
| # If query not found in any field, skip this record | |
| if query_lower not in tickers and query_lower not in date_str and query_lower not in risk: | |
| continue | |
| filtered_history.append(record) | |
| # Store filtered records globally for row selection | |
| global HISTORY_RECORDS | |
| HISTORY_RECORDS = filtered_history | |
| # Calculate pagination | |
| total_items = len(filtered_history) | |
| total_pages = max(1, (total_items + page_size - 1) // page_size) | |
| page = max(1, min(page, total_pages)) # Clamp page to valid range | |
| start_idx = (page - 1) * page_size | |
| end_idx = min(start_idx + page_size, total_items) | |
| page_items = filtered_history[start_idx:end_idx] | |
| # Format for display | |
| rows = [] | |
| for i, record in enumerate(page_items, start=start_idx): | |
| holdings_str = ", ".join([h.get("ticker", "?") for h in record.get("holdings_snapshot", [])]) | |
| synthesis_preview = record.get("ai_synthesis", "") | |
| rows.append([ | |
| str(i), # Global index for selection | |
| record.get("created_at", "").split("T")[0], | |
| holdings_str, | |
| record.get("portfolios", {}).get("risk_tolerance", "moderate"), | |
| synthesis_preview | |
| ]) | |
| page_info = f"Page {page} of {total_pages} ({total_items} total)" | |
| return rows, page_info | |
| except Exception as e: | |
| logger.error(f"Failed to filter history: {e}") | |
| return [], "Page 1 of 1" | |
| def sync_filter_history(session_state, search_query: str, date_filter: str, page: int = 1, page_size: int = 10): | |
| """Synchronous wrapper for filter_history.""" | |
| return asyncio.run(filter_history(session_state, search_query, date_filter, page, page_size)) | |
| def view_historical_analysis(evt: gr.SelectData): | |
| """View full details of a selected historical analysis. | |
| Args: | |
| evt: Gradio SelectData event containing row index | |
| Returns: | |
| Markdown string with full analysis details | |
| """ | |
| global HISTORY_RECORDS | |
| try: | |
| # Get the row index from the selected row | |
| row_index = int(evt.value) # First column contains the index | |
| if not HISTORY_RECORDS or row_index >= len(HISTORY_RECORDS): | |
| return "**Error**: Could not load analysis. Please refresh the history table." | |
| record = HISTORY_RECORDS[row_index] | |
| # Format detailed analysis view | |
| details = f"""# Analysis Details | |
| **Date**: {record.get('created_at', 'Unknown').split('T')[0]} | |
| **Portfolio**: {record.get('portfolios', {}).get('name', 'Unnamed Portfolio')} | |
| **Risk Tolerance**: {record.get('portfolios', {}).get('risk_tolerance', 'moderate')} | |
| --- | |
| ## Holdings Snapshot | |
| """ | |
| # Format holdings as table | |
| holdings = record.get('holdings_snapshot', []) | |
| if holdings: | |
| details += "\n| Ticker | Shares | Value |\n|--------|--------|-------|\n" | |
| for holding in holdings: | |
| details += f"| {holding.get('ticker', '?')} | {holding.get('shares', 0)} | ${holding.get('value', 0):,.2f} |\n" | |
| else: | |
| details += "\n*No holdings data available*\n" | |
| details += f""" | |
| --- | |
| ## AI Analysis | |
| {record.get('ai_synthesis', '*No analysis available*')} | |
| --- | |
| ## Recommendations | |
| """ | |
| recommendations = record.get('recommendations', []) | |
| if recommendations: | |
| for i, rec in enumerate(recommendations, 1): | |
| details += f"\n{i}. {rec}" | |
| else: | |
| details += "\n*No recommendations available*" | |
| details += f""" | |
| --- | |
| **Model**: {record.get('model_version', 'Unknown')} | |
| **Execution Time**: {record.get('execution_time_ms', 0):,.0f}ms | |
| """ | |
| return details | |
| except Exception as e: | |
| logger.error(f"Failed to view historical analysis: {e}") | |
| return f"**Error**: Failed to load analysis details: {str(e)}" | |
| def export_current_analysis_csv(): | |
| """Return pre-generated CSV file path. | |
| Returns: | |
| Path to temporary CSV file, or None if no analysis available | |
| """ | |
| global LAST_EXPORT_CSV_PATH | |
| if not LAST_EXPORT_CSV_PATH: | |
| logger.warning("No CSV export available") | |
| return None | |
| logger.info(f"Returning pre-generated CSV: {LAST_EXPORT_CSV_PATH}") | |
| return LAST_EXPORT_CSV_PATH | |
| def export_current_analysis_pdf(): | |
| """Return pre-generated PDF file path. | |
| Returns: | |
| Path to temporary PDF file, or None if no analysis available | |
| """ | |
| global LAST_EXPORT_PDF_PATH | |
| if not LAST_EXPORT_PDF_PATH: | |
| logger.warning("No PDF export available") | |
| return None | |
| logger.info(f"Returning pre-generated PDF: {LAST_EXPORT_PDF_PATH}") | |
| return LAST_EXPORT_PDF_PATH | |
| def validate_email_format(email: str) -> str: | |
| """Validate email format in real-time. | |
| Args: | |
| email: Email address to validate | |
| Returns: | |
| Validation message | |
| """ | |
| import re | |
| if not email: | |
| return "" | |
| email_regex = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| if re.match(email_regex, email): | |
| return "β Valid email format" | |
| else: | |
| return "β Invalid email format" | |
| def validate_password_match(password: str, confirm: str) -> str: | |
| """Check if passwords match in real-time. | |
| Args: | |
| password: Password field value | |
| confirm: Confirmation field value | |
| Returns: | |
| Validation message | |
| """ | |
| if not confirm: | |
| return "" | |
| if password == confirm: | |
| return "β Passwords match" | |
| else: | |
| return "β Passwords do not match" | |
| def check_password_strength(password: str) -> str: | |
| """Real-time password strength indicator. | |
| Args: | |
| password: Password to evaluate | |
| Returns: | |
| Strength indicator with feedback | |
| """ | |
| import re | |
| if not password: | |
| return "" | |
| score = 0 | |
| feedback = [] | |
| # Length check (main NIST requirement) | |
| if len(password) >= 15: | |
| score += 3 # Length is most important | |
| elif len(password) >= 12: | |
| score += 2 | |
| feedback.append("15+ characters") | |
| elif len(password) >= 8: | |
| score += 1 | |
| feedback.append("15+ characters") | |
| else: | |
| feedback.append("15+ characters") | |
| # Optional complexity (not required by NIST) | |
| if re.search(r'[A-Z]', password): | |
| score += 1 | |
| if re.search(r'[a-z]', password): | |
| score += 1 | |
| if re.search(r'\d', password): | |
| score += 1 | |
| if re.search(r'[!@#$%^&*(),.?":{}|<>]', password): | |
| score += 1 | |
| # Generate feedback message | |
| if score <= 2: | |
| return "π΄ **Weak** - Add: " + ", ".join(feedback) if feedback else "π΄ **Weak**" | |
| elif score <= 4: | |
| return "π **Moderate** - " + (("Add: " + ", ".join(feedback)) if feedback else "Consider 15+ characters") | |
| elif score <= 6: | |
| return "π‘ **Good** - " + (("Add: " + ", ".join(feedback)) if feedback else "Meets requirements") | |
| else: | |
| return "π’ **Strong** - Exceeds requirements" | |
| def show_history_page(): | |
| """Navigate to history page.""" | |
| return { | |
| task_page: gr.update(visible=False), | |
| input_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| history_page: gr.update(visible=True) | |
| } | |
| def load_tax_holdings(): | |
| """Load current portfolio holdings into the cost basis dataframe. | |
| Returns: | |
| List of lists with [ticker, shares, "", ""] for each holding. | |
| """ | |
| global LAST_ANALYSIS_STATE | |
| if not LAST_ANALYSIS_STATE or "holdings" not in LAST_ANALYSIS_STATE: | |
| return [] | |
| holdings = LAST_ANALYSIS_STATE.get("holdings", []) | |
| dataframe_data = [] | |
| for holding in holdings: | |
| ticker = holding.get("ticker", "") | |
| quantity = holding.get("quantity", 0) | |
| if ticker and quantity > 0: | |
| dataframe_data.append([ticker, quantity, None, ""]) | |
| return dataframe_data | |
| def calculate_tax_impact( | |
| filing_status: str, | |
| annual_income: float, | |
| cost_basis_method: str, | |
| cost_basis_data: list | |
| ): | |
| """Calculate tax impact for current portfolio holdings. | |
| Args: | |
| filing_status: Tax filing status | |
| annual_income: Annual taxable income | |
| cost_basis_method: Cost basis calculation method | |
| cost_basis_data: Dataframe rows with ticker, shares, purchase_price, purchase_date | |
| Returns: | |
| Formatted markdown tax analysis report | |
| """ | |
| global LAST_ANALYSIS_STATE | |
| if not LAST_ANALYSIS_STATE or "holdings" not in LAST_ANALYSIS_STATE: | |
| return """# Tax Impact Analysis | |
| **Error**: Please run a portfolio analysis first before calculating tax impact. | |
| Use the "Analyse Portfolio" button to analyse your portfolio, then return here to view tax implications. | |
| """ | |
| try: | |
| holdings = LAST_ANALYSIS_STATE.get("holdings", []) | |
| # Parse cost basis dataframe into structured format | |
| user_cost_basis = [] | |
| # Handle pandas DataFrame from Gradio | |
| import pandas as pd | |
| if isinstance(cost_basis_data, pd.DataFrame): | |
| logger.info(f"Received DataFrame with {len(cost_basis_data)} rows") | |
| if not cost_basis_data.empty: | |
| # Convert DataFrame to list of lists | |
| cost_basis_rows = cost_basis_data.values.tolist() | |
| logger.info(f"DataFrame data: {cost_basis_rows}") | |
| else: | |
| cost_basis_rows = [] | |
| logger.info("DataFrame is empty") | |
| elif cost_basis_data is not None: | |
| # Already a list | |
| cost_basis_rows = cost_basis_data | |
| logger.info(f"Received list with {len(cost_basis_rows)} rows") | |
| else: | |
| cost_basis_rows = [] | |
| logger.info("No cost basis data provided") | |
| for row in cost_basis_rows: | |
| if len(row) >= 4: | |
| # Handle pandas nan values | |
| ticker_val = row[0] | |
| if pd.isna(ticker_val) or not str(ticker_val).strip(): | |
| continue | |
| ticker = str(ticker_val).strip().upper() | |
| shares = None if pd.isna(row[1]) else row[1] | |
| purchase_price = None if pd.isna(row[2]) else row[2] | |
| purchase_date = "" if pd.isna(row[3]) else str(row[3]).strip() | |
| # Only include rows with meaningful data | |
| if ticker and purchase_price is not None and purchase_date: | |
| user_cost_basis.append({ | |
| "ticker": ticker, | |
| "shares": shares, | |
| "purchase_price": purchase_price, | |
| "purchase_date": purchase_date, | |
| }) | |
| logger.info(f"Parsed {len(user_cost_basis)} valid cost basis entries: {user_cost_basis}") | |
| report = create_tax_analysis( | |
| holdings=holdings, | |
| filing_status=filing_status, | |
| annual_income=annual_income, | |
| cost_basis_method=cost_basis_method, | |
| user_cost_basis=user_cost_basis if user_cost_basis else None, | |
| ) | |
| return format_tax_analysis_output(report) | |
| except Exception as e: | |
| logger.error(f"Tax calculation error: {e}", exc_info=True) | |
| return f"""# Tax Impact Analysis | |
| **Error**: {str(e)} | |
| Please try again with different parameters. | |
| """ | |
| async def save_portfolio_input_text(user_id: str, portfolio_text: str): | |
| """Save portfolio input text for quick reload. | |
| Automatically keeps only last 3 entries per user via database trigger. | |
| """ | |
| try: | |
| await db.save_portfolio_input(user_id, portfolio_text) | |
| except Exception as e: | |
| logger.error(f"Failed to save portfolio input: {e}") | |
| async def get_past_portfolios_for_dropdown(user_id: str): | |
| """Get last 3 portfolio inputs formatted for dropdown.""" | |
| try: | |
| portfolios = await db.get_portfolio_inputs(user_id, limit=3) | |
| # Format for dropdown: (display_text, index) | |
| choices = [] | |
| for i, portfolio in enumerate(portfolios): | |
| # Generate preview from first few lines of description | |
| lines = [line.strip() for line in portfolio['description'].split('\n') if line.strip()][:3] | |
| tickers = [] | |
| for line in lines: | |
| parts = line.split() | |
| if parts: | |
| tickers.append(parts[0]) | |
| preview = ', '.join(tickers) | |
| total_lines = len([l for l in portfolio['description'].split('\n') if l.strip()]) | |
| if total_lines > 3: | |
| preview += f' +{total_lines - 3} more' | |
| choices.append((preview, i)) | |
| return choices | |
| except Exception as e: | |
| logger.error(f"Failed to load past portfolios: {e}") | |
| return [] | |
| async def load_portfolio_text(user_id: str, index: int): | |
| """Load portfolio text by index (0 = most recent).""" | |
| try: | |
| portfolios = await db.get_portfolio_inputs(user_id, limit=3) | |
| if 0 <= index < len(portfolios): | |
| return portfolios[index].get('description', '') | |
| return "" | |
| except Exception as e: | |
| logger.error(f"Failed to load portfolio text: {e}") | |
| return "" | |
| async def handle_analysis(session_state, portfolio_text, roast_mode, persona, request: gr.Request, progress=gr.Progress()): | |
| # Check authentication | |
| if not check_authentication(session_state): | |
| yield { | |
| input_page: gr.update(visible=False), | |
| loading_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| loading_message: "β Please sign in to analyse your portfolio", | |
| analysis_output: "", | |
| performance_metrics_output: "", | |
| allocation_plot: None, | |
| risk_plot: None, | |
| performance_plot: None, | |
| correlation_plot: None, | |
| optimization_plot: None, | |
| load_past_portfolio_dropdown: gr.skip(), | |
| export_pdf_btn: gr.skip(), | |
| export_csv_btn: gr.skip() | |
| } | |
| return | |
| # Enforce rate limiting | |
| if rate_limit_middleware: | |
| try: | |
| rate_limit_middleware.enforce(request, session_state=session_state) | |
| except Exception as e: | |
| logger.warning(f"Rate limit exceeded: {e}") | |
| yield { | |
| input_page: gr.update(visible=False), | |
| loading_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| loading_message: f"β {str(e)}", | |
| analysis_output: "", | |
| performance_metrics_output: "", | |
| allocation_plot: None, | |
| risk_plot: None, | |
| performance_plot: None, | |
| correlation_plot: None, | |
| optimization_plot: None, | |
| load_past_portfolio_dropdown: gr.skip(), | |
| export_pdf_btn: gr.skip(), | |
| export_csv_btn: gr.skip() | |
| } | |
| return | |
| # Auto-save portfolio input for quick reload (keep last 3) | |
| session = UserSession.from_dict(session_state) | |
| dropdown_choices = [] | |
| if session and session.user_id and portfolio_text.strip(): | |
| await save_portfolio_input_text(session.user_id, portfolio_text) | |
| dropdown_choices = await get_past_portfolios_for_dropdown(session.user_id) | |
| # Show loading page immediately | |
| yield { | |
| input_page: gr.update(visible=False), | |
| loading_page: gr.update(visible=True), | |
| results_page: gr.update(visible=False), | |
| loading_message: random.choice(LOADING_MESSAGES), | |
| analysis_output: "", | |
| performance_metrics_output: "", | |
| allocation_plot: None, | |
| risk_plot: None, | |
| performance_plot: None, | |
| correlation_plot: None, | |
| optimization_plot: None, | |
| load_past_portfolio_dropdown: gr.update(choices=dropdown_choices) if dropdown_choices else gr.skip(), | |
| export_pdf_btn: gr.skip(), | |
| export_csv_btn: gr.skip() | |
| } | |
| # Run analysis with progress updates | |
| page, analysis, perf_metrics, alloc, risk, perf, corr, opt, audio_btn = await run_analysis_with_ui_update( | |
| session_state, portfolio_text, roast_mode, persona, progress | |
| ) | |
| # Show results or return to input | |
| if page == "results": | |
| global LAST_EXPORT_PDF_PATH, LAST_EXPORT_CSV_PATH | |
| yield { | |
| input_page: gr.update(visible=False), | |
| loading_page: gr.update(visible=False), | |
| results_page: gr.update(visible=True), | |
| loading_message: "Analysis complete!", | |
| analysis_output: analysis, | |
| performance_metrics_output: perf_metrics, | |
| allocation_plot: alloc, | |
| risk_plot: risk, | |
| performance_plot: perf, | |
| correlation_plot: corr, | |
| optimization_plot: opt, | |
| analysis_audio_btn: audio_btn, | |
| load_past_portfolio_dropdown: gr.update(choices=dropdown_choices) if dropdown_choices else gr.skip(), | |
| export_pdf_btn: LAST_EXPORT_PDF_PATH, | |
| export_csv_btn: LAST_EXPORT_CSV_PATH | |
| } | |
| else: | |
| yield { | |
| input_page: gr.update(visible=True), | |
| loading_page: gr.update(visible=False), | |
| results_page: gr.update(visible=False), | |
| loading_message: "", | |
| analysis_output: analysis, | |
| performance_metrics_output: "", | |
| allocation_plot: None, | |
| risk_plot: None, | |
| performance_plot: None, | |
| correlation_plot: None, | |
| optimization_plot: None, | |
| analysis_audio_btn: audio_btn, | |
| load_past_portfolio_dropdown: gr.skip(), | |
| export_pdf_btn: gr.skip(), | |
| export_csv_btn: gr.skip() | |
| } | |
| # Wire up live preview on input change | |
| portfolio_input.change( | |
| update_live_preview, | |
| inputs=[portfolio_input], | |
| outputs=[preview_output] | |
| ) | |
| # Wire up refresh prices button | |
| refresh_prices_btn.click( | |
| fetch_and_update_preview, | |
| inputs=[portfolio_input], | |
| outputs=[preview_output] | |
| ) | |
| # Wire up past portfolio dropdown to load selected portfolio | |
| async def handle_load_past_portfolio(index, session): | |
| """Load selected past portfolio into input field.""" | |
| if index is None: | |
| return gr.skip() | |
| session_obj = UserSession.from_dict(session) | |
| if not session_obj or not session_obj.user_id: | |
| return "" | |
| portfolio_text = await load_portfolio_text(session_obj.user_id, index) | |
| return portfolio_text | |
| load_past_portfolio_dropdown.change( | |
| handle_load_past_portfolio, | |
| inputs=[load_past_portfolio_dropdown, session_state], | |
| outputs=[portfolio_input] | |
| ) | |
| analyse_btn.click( | |
| handle_analysis, | |
| inputs=[session_state, portfolio_input, roast_mode_toggle, persona_dropdown], | |
| outputs=[ | |
| input_page, | |
| loading_page, | |
| results_page, | |
| loading_message, | |
| analysis_output, | |
| performance_metrics_output, | |
| allocation_plot, | |
| risk_plot, | |
| performance_plot, | |
| correlation_plot, | |
| optimization_plot, | |
| load_past_portfolio_dropdown, | |
| export_pdf_btn, | |
| export_csv_btn, | |
| analysis_audio_btn # Audio button | |
| ], | |
| show_progress="full" | |
| ) | |
| # Export buttons - download directly (DownloadButton triggers browser download automatically) | |
| export_pdf_btn.click( | |
| export_current_analysis_pdf, | |
| outputs=export_pdf_btn | |
| ) | |
| export_csv_btn.click( | |
| export_current_analysis_csv, | |
| outputs=export_csv_btn | |
| ) | |
| # Back button from input page to task page | |
| input_back_btn.click( | |
| show_task_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| # Auto-load history when History tab is selected | |
| def load_history_if_selected(evt: gr.SelectData, session): | |
| """Load history only when History tab is selected.""" | |
| if evt.index == 4: # History tab is index 4 (0-indexed: Analysis, Dashboard, Tax, Stress, History) | |
| return sync_load_history(session) | |
| return gr.skip(), gr.skip() | |
| results_tabs.select( | |
| load_history_if_selected, | |
| inputs=[session_state], | |
| outputs=[history_table_results, history_details_output_results] | |
| ) | |
| back_to_input_btn.click( | |
| show_input_page, | |
| outputs=[task_page, input_page, results_page, history_page] | |
| ) | |
| # History table selection handlers | |
| history_table.select( | |
| view_historical_analysis, | |
| outputs=[history_details_output] | |
| ) | |
| history_table_results.select( | |
| view_historical_analysis, | |
| outputs=[history_details_output_results] | |
| ) | |
| # History search and filter handlers (standalone history page) | |
| history_search.change( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search, history_date_filter, history_current_page], | |
| outputs=[history_table, history_page_info], | |
| show_api=False | |
| ).then( | |
| lambda: 1, # Reset to page 1 on search | |
| outputs=[history_current_page], | |
| show_api=False | |
| ) | |
| history_date_filter.change( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search, history_date_filter, history_current_page], | |
| outputs=[history_table, history_page_info], | |
| show_api=False | |
| ).then( | |
| lambda: 1, # Reset to page 1 on filter change | |
| outputs=[history_current_page], | |
| show_api=False | |
| ) | |
| history_refresh_btn.click( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search, history_date_filter, history_current_page], | |
| outputs=[history_table, history_page_info], | |
| show_api=False | |
| ) | |
| # Pagination handlers (standalone) | |
| history_prev_btn.click( | |
| lambda page: max(1, page - 1), | |
| inputs=[history_current_page], | |
| outputs=[history_current_page], | |
| show_api=False | |
| ).then( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search, history_date_filter, history_current_page], | |
| outputs=[history_table, history_page_info], | |
| show_api=False | |
| ) | |
| history_next_btn.click( | |
| lambda page: page + 1, | |
| inputs=[history_current_page], | |
| outputs=[history_current_page], | |
| show_api=False | |
| ).then( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search, history_date_filter, history_current_page], | |
| outputs=[history_table, history_page_info], | |
| show_api=False | |
| ) | |
| # History search and filter handlers (results page tab) | |
| history_search_results.change( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], | |
| outputs=[history_table_results, history_page_info_results], | |
| show_api=False | |
| ).then( | |
| lambda: 1, # Reset to page 1 on search | |
| outputs=[history_current_page_results], | |
| show_api=False | |
| ) | |
| history_date_filter_results.change( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], | |
| outputs=[history_table_results, history_page_info_results], | |
| show_api=False | |
| ).then( | |
| lambda: 1, # Reset to page 1 on filter change | |
| outputs=[history_current_page_results], | |
| show_api=False | |
| ) | |
| history_refresh_btn_results.click( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], | |
| outputs=[history_table_results, history_page_info_results], | |
| show_api=False | |
| ) | |
| # Pagination handlers (results page) | |
| history_prev_btn_results.click( | |
| lambda page: max(1, page - 1), | |
| inputs=[history_current_page_results], | |
| outputs=[history_current_page_results], | |
| show_api=False | |
| ).then( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], | |
| outputs=[history_table_results, history_page_info_results], | |
| show_api=False | |
| ) | |
| history_next_btn_results.click( | |
| lambda page: page + 1, | |
| inputs=[history_current_page_results], | |
| outputs=[history_current_page_results], | |
| show_api=False | |
| ).then( | |
| lambda sess, query, date_filter, page: sync_filter_history(sess, query, date_filter, page), | |
| inputs=[session_state, history_search_results, history_date_filter_results, history_current_page_results], | |
| outputs=[history_table_results, history_page_info_results], | |
| show_api=False | |
| ) | |
| # Tax load holdings button | |
| tax_load_holdings_btn.click( | |
| load_tax_holdings, | |
| inputs=[], | |
| outputs=[tax_cost_basis_input] | |
| ) | |
| # Tax calculator button (Enhancement #5) | |
| tax_calculate_btn.click( | |
| calculate_tax_impact, | |
| inputs=[tax_filing_status, tax_annual_income, tax_cost_basis_method, tax_cost_basis_input], | |
| outputs=[tax_analysis_output] | |
| ) | |
| # Wire up stress test button | |
| stress_test_btn.click( | |
| run_stress_test, | |
| inputs=[stress_scenario_dropdown, stress_n_sims, stress_horizon], | |
| outputs=[ | |
| stress_summary, | |
| stress_dashboard_plot, | |
| stress_mc_plot, | |
| stress_scenario_plot, | |
| stress_drawdown_plot | |
| ] | |
| ) | |
| # Authentication event handlers | |
| async def handle_login(email: str, password: str, current_session: Dict): | |
| """Handle user login.""" | |
| if not email or not password: | |
| return current_session, "Please enter email and password", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| success, message, session = await auth.login(email, password) | |
| if success and session: | |
| # Update session state | |
| session_dict = session.to_dict() | |
| # Get past portfolio dropdown choices | |
| dropdown_choices = await get_past_portfolios_for_dropdown(session.user_id) | |
| return ( | |
| session_dict, | |
| f"β {message}", | |
| gr.update(visible=False), # Hide login form | |
| gr.update(visible=True), # Show task selection | |
| gr.update(value=f"π€ {session.username} | {session.email}"), # Update user info | |
| gr.update(choices=dropdown_choices), # Update past portfolio dropdown | |
| gr.update(visible=True), # Show logout button | |
| gr.update(visible=True) # Show sidebar | |
| ) | |
| else: | |
| # On failure, don't show logout_btn or sidebar | |
| return current_session, f"β {message}", gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update() | |
| async def handle_signup(email: str, password: str, confirm_password: str, username: str, current_session: Dict): | |
| """Handle user signup.""" | |
| # Validation | |
| if not email or not password: | |
| return current_session, "β Please enter email and password" | |
| if password != confirm_password: | |
| return current_session, "β Passwords do not match" | |
| # NIST SP 800-63B-4: Minimum 15 characters for single-factor auth | |
| # No composition rules (uppercase, numbers, symbols) required | |
| if len(password) < 15: | |
| return current_session, "β Password must be at least 15 characters (NIST 2024 requirement)" | |
| success, message, session = await auth.signup(email, password, username) | |
| if success: | |
| if session: | |
| # Auto-login after signup (if email confirmation disabled) | |
| return session.to_dict(), f"β {message}" | |
| else: | |
| return current_session, f"β {message}" | |
| else: | |
| return current_session, f"β {message}" | |
| async def handle_logout(current_session: Dict): | |
| """Handle user logout and reset all UI components. | |
| Security: Clears all global state and user-specific cache to prevent | |
| data leakage between user sessions. | |
| """ | |
| global LAST_ANALYSIS_STATE, HISTORY_RECORDS, LAST_STRESS_TEST | |
| session = UserSession.from_dict(current_session) | |
| # Extract user ID before clearing session (for cache cleanup) | |
| user_id = session.user_id if session else None | |
| # 1. Clear backend authentication | |
| success, message = await auth.logout(session) | |
| # 2. Clear global state variables (CRITICAL for data security) | |
| LAST_ANALYSIS_STATE = None | |
| HISTORY_RECORDS = [] | |
| LAST_STRESS_TEST = None | |
| # 3. Clear user-specific cache entries | |
| if user_id: | |
| try: | |
| from backend.caching.factory import cache_manager | |
| # Clear user-specific data from cache | |
| cache_manager.invalidation.invalidate_by_event("user_update", user_id) | |
| logger.info(f"Cleared cache for user {user_id}") | |
| except Exception as e: | |
| logger.error(f"Failed to clear user cache on logout: {e}") | |
| # 4. Clear session and reset ALL UI components to initial state | |
| return ( | |
| {}, # Clear session state | |
| f"β {message}", # Login message | |
| gr.update(visible=True), # Show login container | |
| gr.update(visible=False), # Hide task page | |
| gr.update(visible=False), # Hide input page | |
| gr.update(value="Not logged in"), # Clear user info | |
| gr.update(visible=False), # Hide logout button | |
| gr.update(visible=False), # Hide sidebar | |
| gr.update(visible=False), # Hide results page | |
| gr.update(visible=False), # Hide loading page | |
| gr.update(visible=False), # Hide history page | |
| # Clear all analysis outputs | |
| gr.update(value=""), # analysis_output | |
| gr.update(value=None), # allocation_plot | |
| gr.update(value=None), # risk_plot | |
| gr.update(value=None), # performance_plot | |
| gr.update(value=None), # correlation_plot | |
| gr.update(value=None), # optimization_plot | |
| # Clear history | |
| gr.update(value=[]), # history_table | |
| # Clear password reset modals | |
| gr.update(visible=False), # password_reset_modal | |
| gr.update(visible=False), # password_update_modal | |
| # Clear user input data (SECURITY: prevent data leakage) | |
| gr.update(value=""), # portfolio_input | |
| gr.update(choices=[], value=None), # load_past_portfolio_dropdown | |
| # Clear build page | |
| gr.update(visible=False), # Hide build_page | |
| gr.update(visible=False), # Hide build_results_container | |
| gr.update(value=[]), # Clear build_agent_chat | |
| gr.update(value=[]), # Clear build_goals | |
| gr.update(value=5), # Reset build_risk_tolerance to middle value | |
| gr.update(value=""), # Clear build_constraints | |
| gr.update(value=""), # Clear build_status | |
| # Clear compare page | |
| gr.update(visible=False), # Hide compare_page | |
| gr.update(visible=False), # Hide compare_results_container | |
| gr.update(value=[]), # Clear compare_debate_chat | |
| gr.update(value=""), # Clear compare_portfolio_input | |
| gr.update(value=""), # Clear compare_status | |
| gr.update(value=""), # Clear compare_bull_case | |
| gr.update(value=0), # Reset compare_bull_confidence | |
| gr.update(value=""), # Clear compare_bear_case | |
| gr.update(value=0), # Reset compare_bear_confidence | |
| gr.update(value=""), # Clear compare_consensus | |
| gr.update(value=""), # Clear compare_stance | |
| gr.update(value=[]), # Clear compare_debate_transcript | |
| # Clear test page | |
| gr.update(visible=False), # Hide test_page | |
| gr.update(visible=False), # Hide test_results_container | |
| gr.update(value=""), # Clear test_portfolio_input | |
| gr.update(value=""), # Clear test_changes_input | |
| gr.update(value=100000), # Reset test_portfolio_value to default | |
| gr.update(value=""), # Clear test_status | |
| gr.update(value=[]), # Clear test_current_metrics | |
| gr.update(value=[]), # Clear test_simulated_metrics | |
| gr.update(value=[]), # Clear test_impact_summary | |
| gr.update(value=[]), # Clear test_stress_comparison | |
| gr.update(value=""), # Clear test_assessment | |
| gr.update(value=""), # Clear test_recommendations | |
| ) | |
| def sync_login(email: str, password: str, current_session: Dict): | |
| """Synchronous wrapper for login.""" | |
| return asyncio.run(handle_login(email, password, current_session)) | |
| def sync_signup(email: str, password: str, confirm_password: str, username: str, current_session: Dict): | |
| """Synchronous wrapper for signup.""" | |
| return asyncio.run(handle_signup(email, password, confirm_password, username, current_session)) | |
| def sync_logout(current_session: Dict): | |
| """Synchronous wrapper for logout.""" | |
| return asyncio.run(handle_logout(current_session)) | |
| async def handle_password_reset(email: str): | |
| """Handle password reset request.""" | |
| if not email: | |
| return "", "β Please enter your email address" | |
| success, message = await auth.request_password_reset(email) | |
| # Simple success message | |
| if success: | |
| return "", f"{message}\n\nCheck your email and click the reset link to set your new password." | |
| return "", message | |
| def sync_password_reset(email: str): | |
| """Synchronous wrapper for password reset.""" | |
| return asyncio.run(handle_password_reset(email)) | |
| async def handle_password_update(password: str, confirm: str, email: str, token_hash: str): | |
| """Handle password update after reset with recovery token_hash.""" | |
| if not email: | |
| return "β Please enter your email address" | |
| if not password or not confirm: | |
| return "β Please enter both password fields" | |
| if password != confirm: | |
| return "β Passwords do not match" | |
| # NIST SP 800-63B-4: Minimum 15 characters for single-factor auth | |
| if len(password) < 15: | |
| return "β Password must be at least 15 characters (NIST 2024 requirement)" | |
| success, message = await auth.update_password(password, email, token_hash) | |
| return message | |
| def sync_password_update(password: str, confirm: str, email: str, token_hash: str): | |
| """Synchronous wrapper for password update.""" | |
| return asyncio.run(handle_password_update(password, confirm, email, token_hash)) | |
| # Connect authentication handlers | |
| login_btn.click( | |
| sync_login, | |
| inputs=[login_email, login_password, session_state], | |
| outputs=[session_state, login_message, login_container, task_page, user_info, load_past_portfolio_dropdown, logout_btn, sidebar] | |
| ) | |
| signup_btn.click( | |
| sync_signup, | |
| inputs=[signup_email, signup_password, signup_confirm, signup_username, session_state], | |
| outputs=[session_state, signup_message] | |
| ) | |
| # Real-time email validation | |
| signup_email.change( | |
| validate_email_format, | |
| inputs=[signup_email], | |
| outputs=[signup_email_validation] | |
| ) | |
| # Real-time password strength meter | |
| signup_password.change( | |
| check_password_strength, | |
| inputs=[signup_password], | |
| outputs=[signup_password_strength] | |
| ) | |
| # Real-time password match validation | |
| signup_confirm.change( | |
| validate_password_match, | |
| inputs=[signup_password, signup_confirm], | |
| outputs=[signup_password_match] | |
| ) | |
| # Password reset handlers | |
| forgot_password_btn.click( | |
| lambda: (gr.update(visible=False), gr.update(visible=True)), | |
| outputs=[login_container, password_reset_modal], | |
| show_api=False | |
| ) | |
| reset_cancel_btn.click( | |
| lambda: (gr.update(visible=True), gr.update(visible=False), ""), | |
| outputs=[login_container, password_reset_modal, reset_message], | |
| show_api=False | |
| ) | |
| update_cancel_btn.click( | |
| lambda: (gr.update(visible=True), gr.update(visible=False), ""), | |
| outputs=[login_container, password_update_modal, update_password_message], | |
| show_api=False | |
| ) | |
| reset_submit_btn.click( | |
| sync_password_reset, | |
| inputs=[reset_email], | |
| outputs=[reset_email, reset_message] | |
| ) | |
| # Show password update modal when recovery token_hash is detected | |
| recovery_token_hash.change( | |
| fn=lambda token: ( | |
| gr.update(visible=False) if token else gr.update(visible=True), | |
| gr.update(visible=True) if token else gr.update(visible=False) | |
| ), | |
| inputs=[recovery_token_hash], | |
| outputs=[login_container, password_update_modal], | |
| show_api=False | |
| ) | |
| # Password update handler (separate modal after email link click) | |
| update_password_btn.click( | |
| sync_password_update, | |
| inputs=[new_password, confirm_new_password, recovery_email, recovery_token_hash], | |
| outputs=[update_password_message] | |
| ).then( | |
| # On success, hide modal and show login form | |
| lambda msg: ( | |
| gr.update(visible=False) if "β " in msg else gr.update(visible=True), | |
| gr.update(visible=True) if "β " in msg else gr.update(visible=False) | |
| ), | |
| inputs=[update_password_message], | |
| outputs=[password_update_modal, login_container], | |
| show_api=False | |
| ) | |
| def handle_demo_mode(current_session: Dict): | |
| """Handle demo mode activation (anonymous with rate limiting).""" | |
| # Create a demo session (not authenticated) | |
| demo_session = { | |
| "authenticated": False, | |
| "is_demo": True, | |
| "user_id": None, # No user ID for anonymous | |
| } | |
| return ( | |
| demo_session, # session_state | |
| "", # demo_message (cleared) | |
| gr.update(visible=False), # Hide login container | |
| gr.update(visible=True), # Show task selection | |
| "Demo Mode - 1 free analysis per day", # user_info | |
| gr.update(visible=False), # Hide logout button for demo | |
| gr.update(visible=True) # Show sidebar | |
| ) | |
| demo_btn.click( | |
| handle_demo_mode, | |
| inputs=[session_state], | |
| outputs=[session_state, demo_message, login_container, task_page, user_info, logout_btn, sidebar] | |
| ) | |
| logout_btn.click( | |
| sync_logout, | |
| inputs=[session_state], | |
| outputs=[ | |
| session_state, login_message, login_container, task_page, input_page, user_info, | |
| logout_btn, sidebar, results_page, loading_page, history_page, | |
| analysis_output, allocation_plot, risk_plot, performance_plot, | |
| correlation_plot, optimization_plot, history_table, | |
| password_reset_modal, password_update_modal, | |
| portfolio_input, load_past_portfolio_dropdown, | |
| # Build page components | |
| build_page, build_results_container, build_agent_chat, | |
| build_goals, build_risk_tolerance, build_constraints, build_status, | |
| # Compare page components | |
| compare_page, compare_results_container, compare_debate_chat, compare_portfolio_input, compare_status, | |
| compare_bull_case, compare_bull_confidence, compare_bear_case, compare_bear_confidence, | |
| compare_consensus, compare_stance, compare_debate_transcript, | |
| # Test page components | |
| test_page, test_results_container, test_portfolio_input, test_changes_input, | |
| test_portfolio_value, test_status, test_current_metrics, test_simulated_metrics, | |
| test_impact_summary, test_stress_comparison, test_assessment, test_recommendations | |
| ] | |
| ) | |
| # Navigation event handlers | |
| nav_new_analysis.click( | |
| show_task_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| # Task card event handlers | |
| task_analyse_btn.click( | |
| show_input_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| task_build_btn.click( | |
| show_build_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| task_compare_btn.click( | |
| show_compare_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| task_test_btn.click( | |
| show_test_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| # Build page event handlers | |
| build_back_btn.click( | |
| show_task_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| build_submit_btn.click( | |
| handle_build_portfolio, | |
| inputs=[build_goals, build_risk_tolerance, build_constraints, session_state], | |
| outputs=[ | |
| build_agent_chat, | |
| build_results_container, | |
| build_status, | |
| build_audio_btn # Audio button | |
| ] | |
| ) | |
| build_regenerate_btn.click( | |
| handle_build_portfolio, | |
| inputs=[build_goals, build_risk_tolerance, build_constraints, session_state], | |
| outputs=[ | |
| build_agent_chat, | |
| build_results_container, | |
| build_status, | |
| build_audio_btn | |
| ] | |
| ) | |
| # Compare page event handlers | |
| compare_back_btn.click( | |
| show_task_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| compare_submit_btn.click( | |
| handle_compare_portfolio, | |
| inputs=[compare_portfolio_input, session_state], | |
| outputs=[ | |
| compare_debate_chat, | |
| compare_results_container, | |
| compare_status, | |
| compare_bull_case, | |
| compare_bull_confidence, | |
| compare_bear_case, | |
| compare_bear_confidence, | |
| compare_consensus, | |
| compare_stance, | |
| compare_debate_transcript, | |
| compare_audio_btn # Audio button | |
| ] | |
| ) | |
| # Test page event handlers | |
| test_back_btn.click( | |
| show_task_page, | |
| outputs=[task_page, input_page, results_page, history_page, build_page, compare_page, test_page] | |
| ) | |
| test_submit_btn.click( | |
| sync_handle_test_changes, | |
| inputs=[test_portfolio_input, test_changes_input, test_portfolio_value, session_state], | |
| outputs=[ | |
| test_results_container, | |
| test_status, | |
| test_current_metrics, | |
| test_simulated_metrics, | |
| test_impact_summary, | |
| test_stress_comparison, | |
| test_assessment, | |
| test_recommendations | |
| ] | |
| ) | |
| nav_view_history.click( | |
| show_history_page, | |
| outputs=[task_page, input_page, results_page, history_page] | |
| ).then( | |
| sync_load_history, | |
| inputs=[session_state], | |
| outputs=[history_table, history_details_output] | |
| ) | |
| # Sidebar sign out button (mirrors main logout button) | |
| nav_signout_btn.click( | |
| sync_logout, | |
| inputs=[session_state], | |
| outputs=[ | |
| session_state, login_message, login_container, task_page, input_page, user_info, | |
| logout_btn, sidebar, results_page, loading_page, history_page, | |
| analysis_output, allocation_plot, risk_plot, performance_plot, | |
| correlation_plot, optimization_plot, history_table, | |
| password_reset_modal, password_update_modal, | |
| portfolio_input, load_past_portfolio_dropdown, | |
| # Build page components | |
| build_page, build_results_container, build_agent_chat, | |
| build_goals, build_risk_tolerance, build_constraints, build_status, | |
| # Compare page components | |
| compare_page, compare_results_container, compare_debate_chat, compare_portfolio_input, compare_status, | |
| compare_bull_case, compare_bull_confidence, compare_bear_case, compare_bear_confidence, | |
| compare_consensus, compare_stance, compare_debate_transcript, | |
| # Test page components | |
| test_page, test_results_container, test_portfolio_input, test_changes_input, | |
| test_portfolio_value, test_status, test_current_metrics, test_simulated_metrics, | |
| test_impact_summary, test_stress_comparison, test_assessment, test_recommendations | |
| ] | |
| ) | |
| # ============================================================ | |
| # AUDIO BUTTON EVENT HANDLERS | |
| # ============================================================ | |
| # Use event chaining to show loading state during audio generation: | |
| # 1. Disable button and change text to "Generating..." | |
| # 2. Run async audio generation with progress indicator | |
| # 3. Restore button to original state | |
| # Analysis audio button | |
| analysis_audio_btn.click( | |
| fn=lambda: gr.update(value="Generating...", interactive=False), | |
| outputs=analysis_audio_btn | |
| ).then( | |
| fn=generate_analysis_audio, | |
| inputs=[], | |
| outputs=[analysis_audio_player, analysis_audio_btn], | |
| show_progress="minimal" | |
| ).then( | |
| fn=lambda: gr.update(value="π Listen to Analysis", interactive=True), | |
| outputs=analysis_audio_btn | |
| ) | |
| # Build portfolio audio button | |
| build_audio_btn.click( | |
| fn=lambda: gr.update(value="Generating...", interactive=False), | |
| outputs=build_audio_btn | |
| ).then( | |
| fn=generate_build_audio, | |
| inputs=[], | |
| outputs=[build_audio_player, build_audio_btn], | |
| show_progress="minimal" | |
| ).then( | |
| fn=lambda: gr.update(value="π Listen to Portfolio", interactive=True), | |
| outputs=build_audio_btn | |
| ) | |
| # Compare/debate audio button | |
| compare_audio_btn.click( | |
| fn=lambda: gr.update(value="Generating...", interactive=False), | |
| outputs=compare_audio_btn | |
| ).then( | |
| fn=generate_debate_audio, | |
| inputs=[], | |
| outputs=[compare_audio_player, compare_audio_btn], | |
| show_progress="minimal" | |
| ).then( | |
| fn=lambda: gr.update(value="π Listen to Debate", interactive=True), | |
| outputs=compare_audio_btn | |
| ) | |
| # MCP Tool Registrations (API/MCP only - no UI components) | |
| # Market Data Tools | |
| gr.api(mcp_tools.market_get_quote, api_name="market_get_quote") | |
| gr.api(mcp_tools.market_get_historical_data, api_name="market_get_historical_data") | |
| gr.api(mcp_tools.market_get_fundamentals, api_name="market_get_fundamentals") | |
| gr.api(mcp_tools.market_get_company_profile, api_name="market_get_company_profile") | |
| gr.api(mcp_tools.market_get_income_statement, api_name="market_get_income_statement") | |
| gr.api(mcp_tools.market_get_balance_sheet, api_name="market_get_balance_sheet") | |
| gr.api(mcp_tools.market_get_cash_flow_statement, api_name="market_get_cash_flow_statement") | |
| gr.api(mcp_tools.market_get_financial_ratios, api_name="market_get_financial_ratios") | |
| gr.api(mcp_tools.market_get_key_metrics, api_name="market_get_key_metrics") | |
| gr.api(mcp_tools.market_get_economic_series, api_name="market_get_economic_series") | |
| # Technical Analysis Tools | |
| gr.api(mcp_tools.technical_get_indicators, api_name="technical_get_indicators") | |
| gr.api(mcp_tools.technical_extract_features, api_name="technical_extract_features") | |
| gr.api(mcp_tools.technical_normalise_features, api_name="technical_normalise_features") | |
| gr.api(mcp_tools.technical_select_features, api_name="technical_select_features") | |
| gr.api(mcp_tools.technical_compute_feature_vector, api_name="technical_compute_feature_vector") | |
| # Portfolio Optimisation Tools | |
| gr.api(mcp_tools.portfolio_optimize_hrp, api_name="portfolio_optimize_hrp") | |
| gr.api(mcp_tools.portfolio_optimize_black_litterman, api_name="portfolio_optimize_black_litterman") | |
| gr.api(mcp_tools.portfolio_optimize_mean_variance, api_name="portfolio_optimize_mean_variance") | |
| # Risk Analysis Tools | |
| gr.api(mcp_tools.risk_analyze, api_name="risk_analyze") | |
| gr.api(mcp_tools.risk_forecast_volatility_garch, api_name="risk_forecast_volatility_garch") | |
| # ML Forecasting Tools | |
| gr.api(mcp_tools.ml_forecast_ensemble, api_name="ml_forecast_ensemble") | |
| # Sentiment Analysis Tools | |
| gr.api(mcp_tools.sentiment_get_news, api_name="sentiment_get_news") | |
| return demo | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| # Suppress console output on HF Spaces for cleaner logs | |
| is_hf_space = os.environ.get("SPACE_ID") is not None | |
| demo.queue().launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False, | |
| quiet=is_hf_space, # Suppress verbose output on HF Spaces | |
| mcp_server=True, # Native MCP server at /gradio_api/mcp/ | |
| allowed_paths=["/tmp"] # Allow serving export files from temp directory | |
| ) | |