Spaces:
Running
on
Zero
Running
on
Zero
| """Tests for P1 hackathon features. | |
| Tests cover: | |
| - Advanced risk metrics (Information Ratio, Calmar Ratio, Ulcer Index) | |
| - GARCH volatility forecasting | |
| - Ensemble predictor with Chronos | |
| - Workflow Phase 2.5 integration | |
| """ | |
| import pytest | |
| import numpy as np | |
| import pandas as pd | |
| from decimal import Decimal | |
| from unittest.mock import Mock, patch, AsyncMock | |
| from typing import Dict, Any | |
| # Advanced Risk Metrics Tests | |
| class TestAdvancedRiskMetrics: | |
| """Test advanced performance metrics in Risk Analyzer MCP.""" | |
| def sample_returns(self): | |
| """Sample returns data for testing.""" | |
| np.random.seed(42) | |
| # Generate returns with known properties | |
| returns = pd.Series(np.random.normal(0.001, 0.02, 252)) # ~0.1% daily, 2% vol | |
| return returns | |
| def sample_prices(self): | |
| """Sample price data for testing.""" | |
| # Create price series with drawdowns | |
| prices = pd.Series([100, 105, 103, 108, 102, 98, 95, 100, 105, 110]) | |
| return prices | |
| def test_ulcer_index_calculation(self, sample_prices): | |
| """Test Ulcer Index calculation.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import _calculate_ulcer_index | |
| ulcer = _calculate_ulcer_index(sample_prices, lookback_period=5) | |
| # Ulcer Index should be non-negative | |
| assert ulcer >= 0 | |
| # Should be a float | |
| assert isinstance(ulcer, float) | |
| def test_ulcer_index_no_drawdown(self): | |
| """Test Ulcer Index with no drawdowns.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import _calculate_ulcer_index | |
| # Monotonically increasing prices | |
| prices = pd.Series([100, 101, 102, 103, 104, 105]) | |
| ulcer = _calculate_ulcer_index(prices, lookback_period=3) | |
| # Should be close to zero (no drawdowns) | |
| assert ulcer < 1.0 | |
| def test_calmar_ratio_calculation(self, sample_returns): | |
| """Test Calmar Ratio calculation.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import _calculate_calmar_ratio | |
| calmar = _calculate_calmar_ratio(sample_returns, risk_free_rate=0.02) | |
| # Calmar ratio should be a float | |
| assert isinstance(calmar, float) | |
| # With positive returns, should be positive (if max_drawdown exists) | |
| assert calmar != 0 or sample_returns.min() >= 0 | |
| def test_information_ratio_with_benchmark(self, sample_returns): | |
| """Test Information Ratio with benchmark.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import _calculate_information_ratio | |
| # Create benchmark returns (slightly different) | |
| benchmark = sample_returns * 0.9 + 0.0001 | |
| info_ratio = _calculate_information_ratio( | |
| sample_returns, | |
| benchmark, | |
| periods_per_year=252 | |
| ) | |
| # Should return a float | |
| assert isinstance(info_ratio, float) | |
| # Should be non-zero (returns differ from benchmark) | |
| assert info_ratio != 0 | |
| def test_information_ratio_no_benchmark(self, sample_returns): | |
| """Test Information Ratio without benchmark.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import _calculate_information_ratio | |
| info_ratio = _calculate_information_ratio( | |
| sample_returns, | |
| benchmark_returns=None, | |
| periods_per_year=252 | |
| ) | |
| # Should return None when no benchmark | |
| assert info_ratio is None | |
| async def test_advanced_metrics_in_risk_analysis(self): | |
| """Test that advanced metrics are included in risk analysis.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import analyze_risk, RiskAnalysisRequest | |
| # Create sample portfolio data | |
| portfolio = [ | |
| { | |
| "ticker": "AAPL", | |
| "weight": 0.5, | |
| "prices": [float(x) for x in range(100, 120)], | |
| }, | |
| { | |
| "ticker": "GOOGL", | |
| "weight": 0.5, | |
| "prices": [float(x) for x in range(150, 170)], | |
| } | |
| ] | |
| request = RiskAnalysisRequest( | |
| portfolio=portfolio, | |
| portfolio_value=Decimal("50000"), | |
| confidence_level=0.95, | |
| method="historical", | |
| ) | |
| result = await analyze_risk.fn(request) | |
| # Check that advanced metrics are present | |
| assert result.risk_metrics is not None | |
| assert hasattr(result.risk_metrics, 'calmar_ratio') | |
| assert hasattr(result.risk_metrics, 'ulcer_index') | |
| async def test_risk_analysis_with_benchmark(self): | |
| """Test risk analysis with benchmark for Information Ratio calculation.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import ( | |
| analyze_risk, | |
| RiskAnalysisRequest, | |
| BenchmarkInput | |
| ) | |
| # Create sample portfolio data with realistic price movements | |
| np.random.seed(42) | |
| base_prices = 100 | |
| portfolio_prices = [base_prices] | |
| for _ in range(99): | |
| change = np.random.normal(0.001, 0.02) | |
| portfolio_prices.append(portfolio_prices[-1] * (1 + change)) | |
| # Create benchmark prices (slightly different performance) | |
| benchmark_prices = [base_prices] | |
| for _ in range(99): | |
| change = np.random.normal(0.0008, 0.015) | |
| benchmark_prices.append(benchmark_prices[-1] * (1 + change)) | |
| portfolio = [ | |
| { | |
| "ticker": "PORTFOLIO", | |
| "weight": 1.0, | |
| "prices": [Decimal(str(p)) for p in portfolio_prices], | |
| } | |
| ] | |
| benchmark = BenchmarkInput( | |
| ticker="SPY", | |
| prices=[Decimal(str(p)) for p in benchmark_prices] | |
| ) | |
| request = RiskAnalysisRequest( | |
| portfolio=portfolio, | |
| portfolio_value=Decimal("100000"), | |
| confidence_level=0.95, | |
| method="historical", | |
| benchmark=benchmark | |
| ) | |
| result = await analyze_risk.fn(request) | |
| # Check that Information Ratio is calculated | |
| assert result.risk_metrics is not None | |
| assert result.risk_metrics.information_ratio is not None | |
| # Information Ratio should be a reasonable value | |
| assert isinstance(result.risk_metrics.information_ratio, Decimal) | |
| # Should be non-zero since portfolio differs from benchmark | |
| assert float(result.risk_metrics.information_ratio) != 0 | |
| async def test_risk_analysis_without_benchmark_backward_compatibility(self): | |
| """Test backward compatibility - risk analysis without benchmark.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import analyze_risk, RiskAnalysisRequest | |
| # Create sample portfolio data | |
| portfolio = [ | |
| { | |
| "ticker": "AAPL", | |
| "weight": 1.0, | |
| "prices": [Decimal(str(x)) for x in range(100, 150)], | |
| } | |
| ] | |
| # Request without benchmark (backward compatible) | |
| request = RiskAnalysisRequest( | |
| portfolio=portfolio, | |
| portfolio_value=Decimal("50000"), | |
| confidence_level=0.95, | |
| method="historical", | |
| ) | |
| result = await analyze_risk.fn(request) | |
| # Should complete successfully | |
| assert result.risk_metrics is not None | |
| # Information Ratio should be None when no benchmark provided | |
| assert result.risk_metrics.information_ratio is None | |
| # Other metrics should still be present | |
| assert result.risk_metrics.sharpe_ratio is not None | |
| assert result.risk_metrics.calmar_ratio is not None | |
| async def test_risk_analysis_benchmark_length_mismatch(self): | |
| """Test handling of benchmark with different length than portfolio.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import ( | |
| analyze_risk, | |
| RiskAnalysisRequest, | |
| BenchmarkInput | |
| ) | |
| # Generate realistic price data with variance | |
| np.random.seed(123) | |
| # Portfolio with 50 data points | |
| portfolio_prices = [100.0] | |
| for _ in range(49): | |
| change = np.random.normal(0.001, 0.015) | |
| portfolio_prices.append(portfolio_prices[-1] * (1 + change)) | |
| # Benchmark with only 30 data points | |
| benchmark_prices = [100.0] | |
| for _ in range(29): | |
| change = np.random.normal(0.0008, 0.012) | |
| benchmark_prices.append(benchmark_prices[-1] * (1 + change)) | |
| portfolio = [ | |
| { | |
| "ticker": "AAPL", | |
| "weight": 1.0, | |
| "prices": [Decimal(str(p)) for p in portfolio_prices], | |
| } | |
| ] | |
| benchmark = BenchmarkInput( | |
| ticker="SPY", | |
| prices=[Decimal(str(p)) for p in benchmark_prices] | |
| ) | |
| request = RiskAnalysisRequest( | |
| portfolio=portfolio, | |
| portfolio_value=Decimal("50000"), | |
| confidence_level=0.95, | |
| method="historical", | |
| benchmark=benchmark | |
| ) | |
| result = await analyze_risk.fn(request) | |
| # Should handle length mismatch gracefully | |
| assert result.risk_metrics is not None | |
| # Information Ratio should still be calculated after alignment | |
| assert result.risk_metrics.information_ratio is not None | |
| async def test_risk_analysis_benchmark_error_handling(self): | |
| """Test error handling for invalid benchmark data.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import ( | |
| analyze_risk, | |
| RiskAnalysisRequest, | |
| BenchmarkInput | |
| ) | |
| portfolio = [ | |
| { | |
| "ticker": "AAPL", | |
| "weight": 1.0, | |
| "prices": [Decimal(str(x)) for x in range(100, 150)], | |
| } | |
| ] | |
| # Benchmark with only 1 data point (will cause error in returns calculation) | |
| benchmark = BenchmarkInput( | |
| ticker="SPY", | |
| prices=[Decimal("100")] | |
| ) | |
| request = RiskAnalysisRequest( | |
| portfolio=portfolio, | |
| portfolio_value=Decimal("50000"), | |
| confidence_level=0.95, | |
| method="historical", | |
| benchmark=benchmark | |
| ) | |
| result = await analyze_risk.fn(request) | |
| # Should handle error gracefully and continue without benchmark | |
| assert result.risk_metrics is not None | |
| # Information Ratio should be None due to benchmark error | |
| assert result.risk_metrics.information_ratio is None | |
| # Other metrics should still work | |
| assert result.risk_metrics.sharpe_ratio is not None | |
| # GARCH Forecasting Tests | |
| class TestGARCHForecasting: | |
| """Test GARCH volatility forecasting.""" | |
| def sample_returns_for_garch(self): | |
| """Sample returns for GARCH testing.""" | |
| np.random.seed(42) | |
| # Generate returns with volatility clustering | |
| returns = [] | |
| vol = 0.02 | |
| for _ in range(500): | |
| vol = 0.95 * vol + 0.05 * 0.02 + 0.1 * np.random.normal(0, 0.01) | |
| returns.append(np.random.normal(0, vol)) | |
| return returns | |
| async def test_garch_forecast_structure(self, sample_returns_for_garch): | |
| """Test GARCH forecast returns correct structure.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import forecast_volatility_garch, GARCHForecastRequest | |
| try: | |
| request = GARCHForecastRequest( | |
| ticker="TEST", | |
| returns=[Decimal(str(r)) for r in sample_returns_for_garch], | |
| forecast_horizon=30, | |
| garch_p=1, | |
| garch_q=1, | |
| ) | |
| result = await forecast_volatility_garch.fn(request) | |
| # Check result structure | |
| assert result.ticker == "TEST" | |
| assert result.model == "GARCH(1,1)" | |
| assert len(result.forecast_volatility) == 30 | |
| assert len(result.annualised_volatility) == 30 | |
| assert result.persistence is not None | |
| assert 0 <= float(result.persistence) <= 1 | |
| assert result.model_diagnostics is not None | |
| except RuntimeError as e: | |
| if "arch library not available" in str(e): | |
| pytest.skip("arch library not installed") | |
| raise | |
| async def test_garch_not_available_handling(self): | |
| """Test graceful handling when GARCH not available.""" | |
| from backend.mcp_servers.risk_analyzer_mcp import forecast_volatility_garch, GARCHForecastRequest | |
| with patch('backend.mcp_servers.risk_analyzer_mcp.GARCH_AVAILABLE', False): | |
| request = GARCHForecastRequest( | |
| ticker="TEST", | |
| returns=[Decimal("0.01")] * 100, | |
| forecast_horizon=10, | |
| ) | |
| with pytest.raises(RuntimeError, match="GARCH forecasting requires"): | |
| await forecast_volatility_garch.fn(request) | |
| # Ensemble Predictor Tests | |
| class TestEnsemblePredictor: | |
| """Test Ensemble Predictor MCP server.""" | |
| def sample_prices_for_forecast(self): | |
| """Sample price data for forecasting.""" | |
| # Create realistic price series | |
| prices = [100.0] | |
| for _ in range(99): | |
| change = np.random.normal(0.001, 0.02) | |
| prices.append(prices[-1] * (1 + change)) | |
| return [Decimal(str(p)) for p in prices] | |
| def test_preprocess_financial_series(self): | |
| """Test financial series preprocessing.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _preprocess_financial_series | |
| prices = np.array([100, 105, 103, 108, 102]) | |
| # Test with returns | |
| preprocessed = _preprocess_financial_series(prices, use_returns=True) | |
| assert len(preprocessed) == len(prices) - 1 # Returns are one less | |
| assert not np.any(np.isnan(preprocessed)) | |
| assert not np.any(np.isinf(preprocessed)) | |
| # Test with log prices | |
| preprocessed_log = _preprocess_financial_series(prices, use_returns=False) | |
| assert len(preprocessed_log) == len(prices) | |
| assert np.all(preprocessed_log > 0) # Log of positive prices | |
| def test_postprocess_predictions(self): | |
| """Test prediction postprocessing.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _postprocess_predictions | |
| last_price = 100.0 | |
| predictions = np.array([0.01, 0.02, -0.01, 0.015]) # Returns | |
| prices = _postprocess_predictions(predictions, last_price, use_returns=True) | |
| # Prices should be positive | |
| assert np.all(prices > 0) | |
| # Should start from last_price and grow | |
| assert len(prices) == len(predictions) | |
| def test_naive_forecast(self): | |
| """Test naive forecasting baseline.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _forecast_naive | |
| context = np.random.normal(0.001, 0.02, 100) | |
| horizon = 30 | |
| forecast, lower, upper = _forecast_naive(context, horizon) | |
| # Check shapes | |
| assert len(forecast) == horizon | |
| assert len(lower) == horizon | |
| assert len(upper) == horizon | |
| # Check uncertainty bounds | |
| assert np.all(upper >= forecast) | |
| assert np.all(forecast >= lower) | |
| def test_moving_average_forecast(self): | |
| """Test moving average forecasting.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _forecast_moving_average | |
| context = np.random.normal(0.001, 0.02, 100) | |
| horizon = 30 | |
| forecast, lower, upper = _forecast_moving_average(context, horizon, window=20) | |
| # Check shapes | |
| assert len(forecast) == horizon | |
| assert len(lower) == horizon | |
| assert len(upper) == horizon | |
| # Forecast should be constant (MA of last window) | |
| assert np.allclose(forecast, forecast[0]) | |
| def test_combine_forecasts_mean(self): | |
| """Test ensemble combination with mean.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _combine_forecasts | |
| forecasts = { | |
| "model1": np.array([1.0, 2.0, 3.0]), | |
| "model2": np.array([2.0, 3.0, 4.0]), | |
| "model3": np.array([3.0, 4.0, 5.0]), | |
| } | |
| combined = _combine_forecasts(forecasts, method="mean") | |
| # Should be mean of forecasts | |
| expected = np.array([2.0, 3.0, 4.0]) | |
| assert np.allclose(combined, expected) | |
| def test_combine_forecasts_median(self): | |
| """Test ensemble combination with median.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _combine_forecasts | |
| forecasts = { | |
| "model1": np.array([1.0, 2.0, 3.0]), | |
| "model2": np.array([2.0, 3.0, 4.0]), | |
| "model3": np.array([10.0, 11.0, 12.0]), # Outlier | |
| } | |
| combined = _combine_forecasts(forecasts, method="median") | |
| # Should be median (robust to outliers) | |
| expected = np.array([2.0, 3.0, 4.0]) | |
| assert np.allclose(combined, expected) | |
| def test_combine_forecasts_weighted(self): | |
| """Test ensemble combination with custom weights.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import _combine_forecasts | |
| forecasts = { | |
| "model1": np.array([1.0, 2.0, 3.0]), | |
| "model2": np.array([2.0, 3.0, 4.0]), | |
| } | |
| weights = {"model1": 0.7, "model2": 0.3} | |
| combined = _combine_forecasts(forecasts, method="weighted", weights=weights) | |
| # Should be weighted average | |
| expected = 0.7 * forecasts["model1"] + 0.3 * forecasts["model2"] | |
| assert np.allclose(combined, expected) | |
| async def test_ensemble_forecast_without_chronos(self, sample_prices_for_forecast): | |
| """Test ensemble forecast using only statistical models.""" | |
| from backend.mcp_servers.ensemble_predictor_mcp import forecast_ensemble, ForecastRequest | |
| # Patch Chronos to be unavailable | |
| with patch('backend.mcp_servers.ensemble_predictor_mcp.CHRONOS_AVAILABLE', False): | |
| request = ForecastRequest( | |
| ticker="TEST", | |
| prices=sample_prices_for_forecast, | |
| forecast_horizon=10, | |
| use_returns=True, | |
| ensemble_method="mean", | |
| ) | |
| result = await forecast_ensemble.fn(request) | |
| # Should still work with statistical baselines | |
| assert result.ticker == "TEST" | |
| assert len(result.predictions) == 10 | |
| assert len(result.lower_bound) == 10 | |
| assert len(result.upper_bound) == 10 | |
| assert len(result.models_used) >= 2 # At least naive and MA | |
| assert "chronos" not in result.models_used | |
| # Workflow Phase 2.5 Tests | |
| class TestWorkflowPhase2_5: | |
| """Test Phase 2.5 integration in workflow.""" | |
| async def test_phase_2_5_adds_ensemble_forecasts(self): | |
| """Test that Phase 2.5 adds ensemble forecasts to state.""" | |
| from backend.agents.workflow import PortfolioAnalysisWorkflow | |
| from backend.models.agent_state import AgentState | |
| # Create mock MCP router | |
| mock_router = Mock() | |
| mock_router.call_ensemble_predictor_mcp = AsyncMock(return_value={ | |
| "ticker": "AAPL", | |
| "predictions": [Decimal("150.0")] * 30, | |
| "lower_bound": [Decimal("145.0")] * 30, | |
| "upper_bound": [Decimal("155.0")] * 30, | |
| "models_used": ["naive", "moving_average"], | |
| "metadata": {"num_models": "2"} | |
| }) | |
| # Mock the analyst agent to avoid API key requirement | |
| with patch('backend.agents.workflow.PortfolioAnalystAgent'): | |
| workflow = PortfolioAnalysisWorkflow(mock_router) | |
| # Create initial state | |
| state: AgentState = { | |
| "portfolio_id": "test", | |
| "user_query": "test", | |
| "risk_tolerance": "moderate", | |
| "holdings": [ | |
| { | |
| "ticker": "AAPL", | |
| "quantity": 10, | |
| "dollar_amount": 0, | |
| "current_price": 150.0, | |
| "market_value": 1500.0, | |
| "weight": 1.0, | |
| } | |
| ], | |
| "historical_prices": { | |
| "AAPL": { | |
| "close_prices": [float(x) for x in range(140, 160)], | |
| "dates": [], | |
| } | |
| }, | |
| "fundamentals": {}, | |
| "economic_data": {}, | |
| "realtime_data": {}, | |
| "technical_indicators": {}, | |
| "optimisation_results": {}, | |
| "risk_analysis": {}, | |
| "ensemble_forecasts": {}, | |
| "ai_synthesis": "", | |
| "recommendations": [], | |
| "reasoning_steps": [], | |
| "current_step": "phase_2_complete", | |
| "errors": [], | |
| "mcp_calls": [], | |
| "phase_1_duration_ms": None, | |
| "phase_2_duration_ms": None, | |
| "phase_2_5_duration_ms": None, | |
| "phase_3_duration_ms": None, | |
| "llm_input_tokens": None, | |
| "llm_output_tokens": None, | |
| "llm_total_tokens": None, | |
| "llm_request_count": None, | |
| } | |
| # Run Phase 2.5 | |
| result_state = await workflow._phase_2_5_ml_predictions(state) | |
| # Check that ensemble forecasts were added | |
| assert "ensemble_forecasts" in result_state | |
| assert result_state["current_step"] == "phase_2_5_complete" | |
| assert result_state["phase_2_5_duration_ms"] is not None | |
| assert result_state["phase_2_5_duration_ms"] >= 0 # Can be 0 for fast tests | |
| def test_agent_state_has_phase_2_5_fields(self): | |
| """Test that AgentState includes Phase 2.5 fields.""" | |
| from backend.models.agent_state import AgentState | |
| # Check type hints exist for Phase 2.5 fields | |
| annotations = AgentState.__annotations__ | |
| assert "ensemble_forecasts" in annotations | |
| assert "phase_2_5_duration_ms" in annotations | |
| # GPU/CPU Fallback Tests | |
| class TestGPUCPUFallback: | |
| """Test GPU detection and CPU fallback.""" | |
| def test_gpu_detection_with_cuda(self, mock_torch): | |
| """Test that GPU is selected when CUDA is available.""" | |
| mock_torch.cuda.is_available.return_value = True | |
| # This would be tested by checking the log messages | |
| # In a real scenario, we'd inspect the device assignment | |
| assert mock_torch.cuda.is_available() is True | |
| def test_cpu_fallback_without_cuda(self, mock_torch): | |
| """Test that CPU is selected when CUDA is not available.""" | |
| mock_torch.cuda.is_available.return_value = False | |
| assert mock_torch.cuda.is_available() is False | |
| if __name__ == "__main__": | |
| pytest.main([__file__, "-v"]) | |