BrianIsaac's picture
feat: implement P1 features and production infrastructure
76897aa
raw
history blame
23.4 kB
"""Tests for P1 hackathon features.
Tests cover:
- Advanced risk metrics (Information Ratio, Calmar Ratio, Ulcer Index)
- GARCH volatility forecasting
- Ensemble predictor with Chronos
- Workflow Phase 2.5 integration
"""
import pytest
import numpy as np
import pandas as pd
from decimal import Decimal
from unittest.mock import Mock, patch, AsyncMock
from typing import Dict, Any
# Advanced Risk Metrics Tests
class TestAdvancedRiskMetrics:
"""Test advanced performance metrics in Risk Analyzer MCP."""
@pytest.fixture
def sample_returns(self):
"""Sample returns data for testing."""
np.random.seed(42)
# Generate returns with known properties
returns = pd.Series(np.random.normal(0.001, 0.02, 252)) # ~0.1% daily, 2% vol
return returns
@pytest.fixture
def sample_prices(self):
"""Sample price data for testing."""
# Create price series with drawdowns
prices = pd.Series([100, 105, 103, 108, 102, 98, 95, 100, 105, 110])
return prices
def test_ulcer_index_calculation(self, sample_prices):
"""Test Ulcer Index calculation."""
from backend.mcp_servers.risk_analyzer_mcp import _calculate_ulcer_index
ulcer = _calculate_ulcer_index(sample_prices, lookback_period=5)
# Ulcer Index should be non-negative
assert ulcer >= 0
# Should be a float
assert isinstance(ulcer, float)
def test_ulcer_index_no_drawdown(self):
"""Test Ulcer Index with no drawdowns."""
from backend.mcp_servers.risk_analyzer_mcp import _calculate_ulcer_index
# Monotonically increasing prices
prices = pd.Series([100, 101, 102, 103, 104, 105])
ulcer = _calculate_ulcer_index(prices, lookback_period=3)
# Should be close to zero (no drawdowns)
assert ulcer < 1.0
def test_calmar_ratio_calculation(self, sample_returns):
"""Test Calmar Ratio calculation."""
from backend.mcp_servers.risk_analyzer_mcp import _calculate_calmar_ratio
calmar = _calculate_calmar_ratio(sample_returns, risk_free_rate=0.02)
# Calmar ratio should be a float
assert isinstance(calmar, float)
# With positive returns, should be positive (if max_drawdown exists)
assert calmar != 0 or sample_returns.min() >= 0
def test_information_ratio_with_benchmark(self, sample_returns):
"""Test Information Ratio with benchmark."""
from backend.mcp_servers.risk_analyzer_mcp import _calculate_information_ratio
# Create benchmark returns (slightly different)
benchmark = sample_returns * 0.9 + 0.0001
info_ratio = _calculate_information_ratio(
sample_returns,
benchmark,
periods_per_year=252
)
# Should return a float
assert isinstance(info_ratio, float)
# Should be non-zero (returns differ from benchmark)
assert info_ratio != 0
def test_information_ratio_no_benchmark(self, sample_returns):
"""Test Information Ratio without benchmark."""
from backend.mcp_servers.risk_analyzer_mcp import _calculate_information_ratio
info_ratio = _calculate_information_ratio(
sample_returns,
benchmark_returns=None,
periods_per_year=252
)
# Should return None when no benchmark
assert info_ratio is None
@pytest.mark.asyncio
async def test_advanced_metrics_in_risk_analysis(self):
"""Test that advanced metrics are included in risk analysis."""
from backend.mcp_servers.risk_analyzer_mcp import analyze_risk, RiskAnalysisRequest
# Create sample portfolio data
portfolio = [
{
"ticker": "AAPL",
"weight": 0.5,
"prices": [float(x) for x in range(100, 120)],
},
{
"ticker": "GOOGL",
"weight": 0.5,
"prices": [float(x) for x in range(150, 170)],
}
]
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal("50000"),
confidence_level=0.95,
method="historical",
)
result = await analyze_risk.fn(request)
# Check that advanced metrics are present
assert result.risk_metrics is not None
assert hasattr(result.risk_metrics, 'calmar_ratio')
assert hasattr(result.risk_metrics, 'ulcer_index')
@pytest.mark.asyncio
async def test_risk_analysis_with_benchmark(self):
"""Test risk analysis with benchmark for Information Ratio calculation."""
from backend.mcp_servers.risk_analyzer_mcp import (
analyze_risk,
RiskAnalysisRequest,
BenchmarkInput
)
# Create sample portfolio data with realistic price movements
np.random.seed(42)
base_prices = 100
portfolio_prices = [base_prices]
for _ in range(99):
change = np.random.normal(0.001, 0.02)
portfolio_prices.append(portfolio_prices[-1] * (1 + change))
# Create benchmark prices (slightly different performance)
benchmark_prices = [base_prices]
for _ in range(99):
change = np.random.normal(0.0008, 0.015)
benchmark_prices.append(benchmark_prices[-1] * (1 + change))
portfolio = [
{
"ticker": "PORTFOLIO",
"weight": 1.0,
"prices": [Decimal(str(p)) for p in portfolio_prices],
}
]
benchmark = BenchmarkInput(
ticker="SPY",
prices=[Decimal(str(p)) for p in benchmark_prices]
)
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal("100000"),
confidence_level=0.95,
method="historical",
benchmark=benchmark
)
result = await analyze_risk.fn(request)
# Check that Information Ratio is calculated
assert result.risk_metrics is not None
assert result.risk_metrics.information_ratio is not None
# Information Ratio should be a reasonable value
assert isinstance(result.risk_metrics.information_ratio, Decimal)
# Should be non-zero since portfolio differs from benchmark
assert float(result.risk_metrics.information_ratio) != 0
@pytest.mark.asyncio
async def test_risk_analysis_without_benchmark_backward_compatibility(self):
"""Test backward compatibility - risk analysis without benchmark."""
from backend.mcp_servers.risk_analyzer_mcp import analyze_risk, RiskAnalysisRequest
# Create sample portfolio data
portfolio = [
{
"ticker": "AAPL",
"weight": 1.0,
"prices": [Decimal(str(x)) for x in range(100, 150)],
}
]
# Request without benchmark (backward compatible)
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal("50000"),
confidence_level=0.95,
method="historical",
)
result = await analyze_risk.fn(request)
# Should complete successfully
assert result.risk_metrics is not None
# Information Ratio should be None when no benchmark provided
assert result.risk_metrics.information_ratio is None
# Other metrics should still be present
assert result.risk_metrics.sharpe_ratio is not None
assert result.risk_metrics.calmar_ratio is not None
@pytest.mark.asyncio
async def test_risk_analysis_benchmark_length_mismatch(self):
"""Test handling of benchmark with different length than portfolio."""
from backend.mcp_servers.risk_analyzer_mcp import (
analyze_risk,
RiskAnalysisRequest,
BenchmarkInput
)
# Generate realistic price data with variance
np.random.seed(123)
# Portfolio with 50 data points
portfolio_prices = [100.0]
for _ in range(49):
change = np.random.normal(0.001, 0.015)
portfolio_prices.append(portfolio_prices[-1] * (1 + change))
# Benchmark with only 30 data points
benchmark_prices = [100.0]
for _ in range(29):
change = np.random.normal(0.0008, 0.012)
benchmark_prices.append(benchmark_prices[-1] * (1 + change))
portfolio = [
{
"ticker": "AAPL",
"weight": 1.0,
"prices": [Decimal(str(p)) for p in portfolio_prices],
}
]
benchmark = BenchmarkInput(
ticker="SPY",
prices=[Decimal(str(p)) for p in benchmark_prices]
)
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal("50000"),
confidence_level=0.95,
method="historical",
benchmark=benchmark
)
result = await analyze_risk.fn(request)
# Should handle length mismatch gracefully
assert result.risk_metrics is not None
# Information Ratio should still be calculated after alignment
assert result.risk_metrics.information_ratio is not None
@pytest.mark.asyncio
async def test_risk_analysis_benchmark_error_handling(self):
"""Test error handling for invalid benchmark data."""
from backend.mcp_servers.risk_analyzer_mcp import (
analyze_risk,
RiskAnalysisRequest,
BenchmarkInput
)
portfolio = [
{
"ticker": "AAPL",
"weight": 1.0,
"prices": [Decimal(str(x)) for x in range(100, 150)],
}
]
# Benchmark with only 1 data point (will cause error in returns calculation)
benchmark = BenchmarkInput(
ticker="SPY",
prices=[Decimal("100")]
)
request = RiskAnalysisRequest(
portfolio=portfolio,
portfolio_value=Decimal("50000"),
confidence_level=0.95,
method="historical",
benchmark=benchmark
)
result = await analyze_risk.fn(request)
# Should handle error gracefully and continue without benchmark
assert result.risk_metrics is not None
# Information Ratio should be None due to benchmark error
assert result.risk_metrics.information_ratio is None
# Other metrics should still work
assert result.risk_metrics.sharpe_ratio is not None
# GARCH Forecasting Tests
class TestGARCHForecasting:
"""Test GARCH volatility forecasting."""
@pytest.fixture
def sample_returns_for_garch(self):
"""Sample returns for GARCH testing."""
np.random.seed(42)
# Generate returns with volatility clustering
returns = []
vol = 0.02
for _ in range(500):
vol = 0.95 * vol + 0.05 * 0.02 + 0.1 * np.random.normal(0, 0.01)
returns.append(np.random.normal(0, vol))
return returns
@pytest.mark.asyncio
@pytest.mark.skipif(
not hasattr(__import__('backend.mcp_servers.risk_analyzer_mcp'), 'GARCH_AVAILABLE'),
reason="GARCH not available"
)
async def test_garch_forecast_structure(self, sample_returns_for_garch):
"""Test GARCH forecast returns correct structure."""
from backend.mcp_servers.risk_analyzer_mcp import forecast_volatility_garch, GARCHForecastRequest
try:
request = GARCHForecastRequest(
ticker="TEST",
returns=[Decimal(str(r)) for r in sample_returns_for_garch],
forecast_horizon=30,
garch_p=1,
garch_q=1,
)
result = await forecast_volatility_garch.fn(request)
# Check result structure
assert result.ticker == "TEST"
assert result.model == "GARCH(1,1)"
assert len(result.forecast_volatility) == 30
assert len(result.annualised_volatility) == 30
assert result.persistence is not None
assert 0 <= float(result.persistence) <= 1
assert result.model_diagnostics is not None
except RuntimeError as e:
if "arch library not available" in str(e):
pytest.skip("arch library not installed")
raise
@pytest.mark.asyncio
async def test_garch_not_available_handling(self):
"""Test graceful handling when GARCH not available."""
from backend.mcp_servers.risk_analyzer_mcp import forecast_volatility_garch, GARCHForecastRequest
with patch('backend.mcp_servers.risk_analyzer_mcp.GARCH_AVAILABLE', False):
request = GARCHForecastRequest(
ticker="TEST",
returns=[Decimal("0.01")] * 100,
forecast_horizon=10,
)
with pytest.raises(RuntimeError, match="GARCH forecasting requires"):
await forecast_volatility_garch.fn(request)
# Ensemble Predictor Tests
class TestEnsemblePredictor:
"""Test Ensemble Predictor MCP server."""
@pytest.fixture
def sample_prices_for_forecast(self):
"""Sample price data for forecasting."""
# Create realistic price series
prices = [100.0]
for _ in range(99):
change = np.random.normal(0.001, 0.02)
prices.append(prices[-1] * (1 + change))
return [Decimal(str(p)) for p in prices]
def test_preprocess_financial_series(self):
"""Test financial series preprocessing."""
from backend.mcp_servers.ensemble_predictor_mcp import _preprocess_financial_series
prices = np.array([100, 105, 103, 108, 102])
# Test with returns
preprocessed = _preprocess_financial_series(prices, use_returns=True)
assert len(preprocessed) == len(prices) - 1 # Returns are one less
assert not np.any(np.isnan(preprocessed))
assert not np.any(np.isinf(preprocessed))
# Test with log prices
preprocessed_log = _preprocess_financial_series(prices, use_returns=False)
assert len(preprocessed_log) == len(prices)
assert np.all(preprocessed_log > 0) # Log of positive prices
def test_postprocess_predictions(self):
"""Test prediction postprocessing."""
from backend.mcp_servers.ensemble_predictor_mcp import _postprocess_predictions
last_price = 100.0
predictions = np.array([0.01, 0.02, -0.01, 0.015]) # Returns
prices = _postprocess_predictions(predictions, last_price, use_returns=True)
# Prices should be positive
assert np.all(prices > 0)
# Should start from last_price and grow
assert len(prices) == len(predictions)
def test_naive_forecast(self):
"""Test naive forecasting baseline."""
from backend.mcp_servers.ensemble_predictor_mcp import _forecast_naive
context = np.random.normal(0.001, 0.02, 100)
horizon = 30
forecast, lower, upper = _forecast_naive(context, horizon)
# Check shapes
assert len(forecast) == horizon
assert len(lower) == horizon
assert len(upper) == horizon
# Check uncertainty bounds
assert np.all(upper >= forecast)
assert np.all(forecast >= lower)
def test_moving_average_forecast(self):
"""Test moving average forecasting."""
from backend.mcp_servers.ensemble_predictor_mcp import _forecast_moving_average
context = np.random.normal(0.001, 0.02, 100)
horizon = 30
forecast, lower, upper = _forecast_moving_average(context, horizon, window=20)
# Check shapes
assert len(forecast) == horizon
assert len(lower) == horizon
assert len(upper) == horizon
# Forecast should be constant (MA of last window)
assert np.allclose(forecast, forecast[0])
def test_combine_forecasts_mean(self):
"""Test ensemble combination with mean."""
from backend.mcp_servers.ensemble_predictor_mcp import _combine_forecasts
forecasts = {
"model1": np.array([1.0, 2.0, 3.0]),
"model2": np.array([2.0, 3.0, 4.0]),
"model3": np.array([3.0, 4.0, 5.0]),
}
combined = _combine_forecasts(forecasts, method="mean")
# Should be mean of forecasts
expected = np.array([2.0, 3.0, 4.0])
assert np.allclose(combined, expected)
def test_combine_forecasts_median(self):
"""Test ensemble combination with median."""
from backend.mcp_servers.ensemble_predictor_mcp import _combine_forecasts
forecasts = {
"model1": np.array([1.0, 2.0, 3.0]),
"model2": np.array([2.0, 3.0, 4.0]),
"model3": np.array([10.0, 11.0, 12.0]), # Outlier
}
combined = _combine_forecasts(forecasts, method="median")
# Should be median (robust to outliers)
expected = np.array([2.0, 3.0, 4.0])
assert np.allclose(combined, expected)
def test_combine_forecasts_weighted(self):
"""Test ensemble combination with custom weights."""
from backend.mcp_servers.ensemble_predictor_mcp import _combine_forecasts
forecasts = {
"model1": np.array([1.0, 2.0, 3.0]),
"model2": np.array([2.0, 3.0, 4.0]),
}
weights = {"model1": 0.7, "model2": 0.3}
combined = _combine_forecasts(forecasts, method="weighted", weights=weights)
# Should be weighted average
expected = 0.7 * forecasts["model1"] + 0.3 * forecasts["model2"]
assert np.allclose(combined, expected)
@pytest.mark.asyncio
async def test_ensemble_forecast_without_chronos(self, sample_prices_for_forecast):
"""Test ensemble forecast using only statistical models."""
from backend.mcp_servers.ensemble_predictor_mcp import forecast_ensemble, ForecastRequest
# Patch Chronos to be unavailable
with patch('backend.mcp_servers.ensemble_predictor_mcp.CHRONOS_AVAILABLE', False):
request = ForecastRequest(
ticker="TEST",
prices=sample_prices_for_forecast,
forecast_horizon=10,
use_returns=True,
ensemble_method="mean",
)
result = await forecast_ensemble.fn(request)
# Should still work with statistical baselines
assert result.ticker == "TEST"
assert len(result.predictions) == 10
assert len(result.lower_bound) == 10
assert len(result.upper_bound) == 10
assert len(result.models_used) >= 2 # At least naive and MA
assert "chronos" not in result.models_used
# Workflow Phase 2.5 Tests
class TestWorkflowPhase2_5:
"""Test Phase 2.5 integration in workflow."""
@pytest.mark.asyncio
async def test_phase_2_5_adds_ensemble_forecasts(self):
"""Test that Phase 2.5 adds ensemble forecasts to state."""
from backend.agents.workflow import PortfolioAnalysisWorkflow
from backend.models.agent_state import AgentState
# Create mock MCP router
mock_router = Mock()
mock_router.call_ensemble_predictor_mcp = AsyncMock(return_value={
"ticker": "AAPL",
"predictions": [Decimal("150.0")] * 30,
"lower_bound": [Decimal("145.0")] * 30,
"upper_bound": [Decimal("155.0")] * 30,
"models_used": ["naive", "moving_average"],
"metadata": {"num_models": "2"}
})
# Mock the analyst agent to avoid API key requirement
with patch('backend.agents.workflow.PortfolioAnalystAgent'):
workflow = PortfolioAnalysisWorkflow(mock_router)
# Create initial state
state: AgentState = {
"portfolio_id": "test",
"user_query": "test",
"risk_tolerance": "moderate",
"holdings": [
{
"ticker": "AAPL",
"quantity": 10,
"dollar_amount": 0,
"current_price": 150.0,
"market_value": 1500.0,
"weight": 1.0,
}
],
"historical_prices": {
"AAPL": {
"close_prices": [float(x) for x in range(140, 160)],
"dates": [],
}
},
"fundamentals": {},
"economic_data": {},
"realtime_data": {},
"technical_indicators": {},
"optimisation_results": {},
"risk_analysis": {},
"ensemble_forecasts": {},
"ai_synthesis": "",
"recommendations": [],
"reasoning_steps": [],
"current_step": "phase_2_complete",
"errors": [],
"mcp_calls": [],
"phase_1_duration_ms": None,
"phase_2_duration_ms": None,
"phase_2_5_duration_ms": None,
"phase_3_duration_ms": None,
"llm_input_tokens": None,
"llm_output_tokens": None,
"llm_total_tokens": None,
"llm_request_count": None,
}
# Run Phase 2.5
result_state = await workflow._phase_2_5_ml_predictions(state)
# Check that ensemble forecasts were added
assert "ensemble_forecasts" in result_state
assert result_state["current_step"] == "phase_2_5_complete"
assert result_state["phase_2_5_duration_ms"] is not None
assert result_state["phase_2_5_duration_ms"] >= 0 # Can be 0 for fast tests
def test_agent_state_has_phase_2_5_fields(self):
"""Test that AgentState includes Phase 2.5 fields."""
from backend.models.agent_state import AgentState
# Check type hints exist for Phase 2.5 fields
annotations = AgentState.__annotations__
assert "ensemble_forecasts" in annotations
assert "phase_2_5_duration_ms" in annotations
# GPU/CPU Fallback Tests
class TestGPUCPUFallback:
"""Test GPU detection and CPU fallback."""
@patch('backend.mcp_servers.ensemble_predictor_mcp.torch')
def test_gpu_detection_with_cuda(self, mock_torch):
"""Test that GPU is selected when CUDA is available."""
mock_torch.cuda.is_available.return_value = True
# This would be tested by checking the log messages
# In a real scenario, we'd inspect the device assignment
assert mock_torch.cuda.is_available() is True
@patch('backend.mcp_servers.ensemble_predictor_mcp.torch')
def test_cpu_fallback_without_cuda(self, mock_torch):
"""Test that CPU is selected when CUDA is not available."""
mock_torch.cuda.is_available.return_value = False
assert mock_torch.cuda.is_available() is False
if __name__ == "__main__":
pytest.main([__file__, "-v"])