Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, UploadFile, File, Form, Query, Path, WebSocket, WebSocketDisconnect | |
| from fastapi.responses import JSONResponse, FileResponse, StreamingResponse | |
| from fastapi.encoders import jsonable_encoder | |
| from typing import Optional, List | |
| from pydantic import BaseModel | |
| from core.security import get_current_user | |
| import sys | |
| import os | |
| import re | |
| import io | |
| import asyncio | |
| import logging | |
| import base64 | |
| import tempfile | |
| import subprocess | |
| from datetime import datetime | |
| from bson import ObjectId | |
| from bson.errors import InvalidId | |
| from pathlib import Path as PathLib | |
| # Add the parent directory to the path to import utils | |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) | |
| # Configure Hugging Face cache directory to avoid permission issues | |
| import os | |
| if not os.getenv('HF_HOME'): | |
| os.environ['HF_HOME'] = '/tmp/huggingface_cache' | |
| if not os.getenv('TRANSFORMERS_CACHE'): | |
| os.environ['TRANSFORMERS_CACHE'] = '/tmp/huggingface_cache' | |
| if not os.getenv('HF_DATASETS_CACHE'): | |
| os.environ['HF_DATASETS_CACHE'] = '/tmp/huggingface_cache' | |
| # Create cache directory if it doesn't exist | |
| os.makedirs('/tmp/huggingface_cache', exist_ok=True) | |
| # Import TxAgent | |
| try: | |
| sys.path.append('/app/src') | |
| from src.txagent import TxAgent | |
| TXAGENT_AVAILABLE = True | |
| except ImportError as e: | |
| logging.warning(f"TxAgent not available: {e}") | |
| TXAGENT_AVAILABLE = False | |
| try: | |
| from utils import clean_text_response, format_risk_level, create_notification | |
| except ImportError: | |
| # Fallback: define the function locally if import fails | |
| def clean_text_response(text: str) -> str: | |
| import re | |
| text = re.sub(r'\n\s*\n', '\n\n', text) | |
| text = re.sub(r'[ ]+', ' ', text) | |
| return text.replace("**", "").replace("__", "").strip() | |
| def format_risk_level(risk_level: str) -> str: | |
| risk_level_mapping = { | |
| 'low': 'low', 'medium': 'moderate', 'moderate': 'moderate', | |
| 'high': 'high', 'severe': 'severe', 'critical': 'severe', | |
| 'none': 'none', 'unknown': 'none' | |
| } | |
| return risk_level_mapping.get(risk_level.lower(), 'none') | |
| def create_notification(user_id: str, title: str, message: str, notification_type: str = "info", patient_id: str = None) -> dict: | |
| return { | |
| "user_id": user_id, "title": title, "message": message, | |
| "type": notification_type, "read": False, | |
| "timestamp": datetime.utcnow(), "patient_id": patient_id | |
| } | |
| try: | |
| from analysis import analyze_patient_report | |
| except ImportError: | |
| # Fallback: define a mock function if import fails | |
| def analyze_patient_report(patient_data): | |
| return {"analysis": "Mock analysis", "status": "success"} | |
| try: | |
| from voice import recognize_speech, text_to_speech, extract_text_from_pdf | |
| except ImportError: | |
| # Fallback: define mock functions if import fails | |
| def recognize_speech(audio_data): | |
| return {"transcription": "Mock transcription"} | |
| def text_to_speech(text, language="en-US"): | |
| return b"Mock audio data" | |
| def extract_text_from_pdf(pdf_data): | |
| return "Mock PDF text" | |
| try: | |
| from docx import Document | |
| except ImportError: | |
| Document = None | |
| logger = logging.getLogger(__name__) | |
| # Initialize TxAgent instance | |
| txagent_instance = None | |
| def _normalize_risk_level(risk_level): | |
| """Normalize risk level names to match expected format""" | |
| return format_risk_level(risk_level) | |
| def get_txagent(): | |
| """Get or create TxAgent instance""" | |
| global txagent_instance | |
| if txagent_instance is None and TXAGENT_AVAILABLE: | |
| try: | |
| # Try to use a more accessible model first | |
| model_name = "microsoft/DialoGPT-medium" # Fallback model | |
| rag_model_name = "sentence-transformers/all-MiniLM-L6-v2" # Fallback RAG model | |
| # Try to use the original models if possible | |
| try: | |
| # Test if we can access the original models | |
| import torch | |
| from transformers import AutoTokenizer | |
| test_tokenizer = AutoTokenizer.from_pretrained("mims-harvard/TxAgent-T1-Llama-3.1-8B", trust_remote_code=True) | |
| model_name = "mims-harvard/TxAgent-T1-Llama-3.1-8B" | |
| rag_model_name = "mims-harvard/ToolRAG-T1-GTE-Qwen2-1.5B" | |
| logger.info("✅ Original TxAgent models are accessible") | |
| except Exception as model_error: | |
| logger.warning(f"⚠️ Original models not accessible, using fallback: {model_error}") | |
| # Initialize TxAgent with available models | |
| txagent_instance = TxAgent( | |
| model_name=model_name, | |
| rag_model_name=rag_model_name, | |
| enable_finish=True, | |
| enable_rag=False, # Set to True if you want RAG functionality | |
| force_finish=True, | |
| enable_checker=True, | |
| step_rag_num=4, | |
| seed=42 | |
| ) | |
| txagent_instance.init_model() | |
| # Set the same chat prompt as the original | |
| txagent_instance.chat_prompt = ( | |
| "You are a clinical assistant AI. Analyze the patient's data and provide clear clinical recommendations." | |
| ) | |
| logger.info(f"✅ TxAgent initialized successfully with model: {model_name}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to initialize TxAgent: {e}") | |
| txagent_instance = None | |
| return txagent_instance | |
| # Define the ChatRequest model with an optional patient_id | |
| class ChatRequest(BaseModel): | |
| message: str | |
| history: Optional[List[dict]] = None | |
| format: Optional[str] = "clean" | |
| temperature: Optional[float] = 0.7 | |
| max_new_tokens: Optional[int] = 512 | |
| patient_id: Optional[str] = None | |
| class VoiceOutputRequest(BaseModel): | |
| text: str | |
| language: str = "en-US" | |
| slow: bool = False | |
| return_format: str = "mp3" | |
| class RiskLevel(BaseModel): | |
| level: str | |
| score: float | |
| factors: Optional[List[str]] = None | |
| router = APIRouter(prefix="/txagent", tags=["TxAgent"]) | |
| async def status(current_user: dict = Depends(get_current_user)): | |
| logger.info(f"Status endpoint accessed by {current_user['email']}") | |
| return { | |
| "status": "running", | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "version": "2.6.0", | |
| "features": ["chat", "voice-input", "voice-output", "patient-analysis", "report-upload", "patient-reports-pdf", "all-patients-reports-pdf"] | |
| } | |
| async def get_patient_analysis_results( | |
| name: Optional[str] = Query(None), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| logger.info(f"Fetching analysis results by {current_user['email']}") | |
| try: | |
| # Check if user has appropriate permissions | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can access analysis results") | |
| # Import database collections | |
| from db.mongo import db | |
| patients_collection = db.patients | |
| analysis_collection = db.patient_analysis_results | |
| query = {} | |
| if name: | |
| name_regex = re.compile(name, re.IGNORECASE) | |
| matching_patients = await patients_collection.find({"full_name": name_regex}).to_list(length=None) | |
| patient_ids = [p["fhir_id"] for p in matching_patients if "fhir_id" in p] | |
| if not patient_ids: | |
| return [] | |
| query = {"patient_id": {"$in": patient_ids}} | |
| analyses = await analysis_collection.find(query).sort("timestamp", -1).to_list(length=100) | |
| enriched_results = [] | |
| for analysis in analyses: | |
| patient = await patients_collection.find_one({"fhir_id": analysis.get("patient_id")}) | |
| if not patient: | |
| continue # Skip if patient no longer exists | |
| # Format the response with proper fields matching the expected format | |
| # Handle both old format (risk_level, risk_score) and new format (suicide_risk object) | |
| suicide_risk_data = analysis.get("suicide_risk", {}) | |
| # Extract risk data from suicide_risk object or fallback to individual fields | |
| if isinstance(suicide_risk_data, dict): | |
| risk_level = suicide_risk_data.get("level", "none") | |
| risk_score = suicide_risk_data.get("score", 0.0) | |
| risk_factors = suicide_risk_data.get("factors", []) | |
| else: | |
| # Fallback to individual fields for backward compatibility | |
| risk_level = analysis.get("risk_level", "none") | |
| risk_score = analysis.get("risk_score", 0.0) | |
| risk_factors = analysis.get("risk_factors", []) | |
| formatted_analysis = { | |
| "_id": str(analysis["_id"]), | |
| "patient_id": analysis.get("patient_id"), | |
| "full_name": patient.get("full_name", "Unknown"), | |
| "timestamp": analysis.get("timestamp"), | |
| "created_at": analysis.get("created_at"), | |
| "analysis_date": analysis.get("analysis_date"), | |
| "suicide_risk": { | |
| "level": _normalize_risk_level(risk_level), | |
| "score": risk_score, | |
| "factors": risk_factors | |
| }, | |
| "summary": analysis.get("summary", ""), | |
| "recommendations": analysis.get("recommendations", []), | |
| # Add patient demographic information for modal display | |
| "date_of_birth": patient.get("date_of_birth"), | |
| "gender": patient.get("gender"), | |
| "city": patient.get("city"), | |
| "state": patient.get("state"), | |
| "address": patient.get("address"), | |
| "postal_code": patient.get("postal_code"), | |
| "country": patient.get("country"), | |
| "marital_status": patient.get("marital_status"), | |
| "language": patient.get("language") | |
| } | |
| enriched_results.append(formatted_analysis) | |
| return enriched_results | |
| except Exception as e: | |
| logger.error(f"Error fetching analysis results: {e}") | |
| return [] | |
| async def analyze_patients( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger analysis for all patients""" | |
| logger.info(f"Triggering analysis for all patients by {current_user['email']}") | |
| try: | |
| # Check if user has appropriate permissions | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can trigger analysis") | |
| # Import database collections and analysis function | |
| from db.mongo import db | |
| from analysis import analyze_patient | |
| patients_collection = db.patients | |
| # Get all patients | |
| patients = await patients_collection.find({}).to_list(length=None) | |
| if not patients: | |
| return {"message": "No patients found to analyze", "analyzed_count": 0} | |
| analyzed_count = 0 | |
| for patient in patients: | |
| try: | |
| await analyze_patient(patient) | |
| analyzed_count += 1 | |
| logger.info(f"✅ Analyzed patient: {patient.get('full_name', 'Unknown')}") | |
| except Exception as e: | |
| logger.error(f"❌ Failed to analyze patient {patient.get('full_name', 'Unknown')}: {e}") | |
| continue | |
| return { | |
| "message": f"Analysis completed for {analyzed_count} patients", | |
| "analyzed_count": analyzed_count, | |
| "total_patients": len(patients) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error triggering analysis: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to trigger analysis") | |
| async def analyze_specific_patient( | |
| patient_id: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Trigger analysis for a specific patient""" | |
| logger.info(f"Triggering analysis for patient {patient_id} by {current_user['email']}") | |
| try: | |
| # Check if user has appropriate permissions | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can trigger analysis") | |
| # Import database collections and analysis function | |
| from db.mongo import db | |
| from analysis import analyze_patient | |
| patients_collection = db.patients | |
| # Find the patient | |
| patient = await patients_collection.find_one({"fhir_id": patient_id}) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Analyze the patient | |
| await analyze_patient(patient) | |
| return { | |
| "message": f"Analysis completed for patient {patient.get('full_name', 'Unknown')}", | |
| "patient_id": patient_id, | |
| "patient_name": patient.get('full_name', 'Unknown') | |
| } | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error analyzing patient {patient_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to analyze patient") | |
| async def chat_with_txagent( | |
| request: ChatRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Chat avec TxAgent intégré""" | |
| try: | |
| # Vérifier que l'utilisateur est médecin ou admin | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use TxAgent") | |
| # For now, return a simple response since the full TxAgent is not yet implemented | |
| response = f"TxAgent integrated response: {request.message}" | |
| return { | |
| "status": "success", | |
| "response": response, | |
| "mode": "integrated" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in TxAgent chat: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to process chat request") | |
| async def chat_stream_with_txagent( | |
| request: ChatRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Streaming chat avec TxAgent intégré""" | |
| try: | |
| # Vérifier que l'utilisateur est médecin ou admin | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use TxAgent") | |
| logger.info(f"Chat stream initiated by {current_user['email']}: {request.message}") | |
| # Generate a response (for now, a simple response) | |
| response_text = f"Hello! I'm your clinical assistant. You said: '{request.message}'. How can I help you with patient care today?" | |
| # Store the chat in the database | |
| try: | |
| from db.mongo import db | |
| chats_collection = db.chats | |
| chat_entry = { | |
| "message": request.message, | |
| "response": response_text, | |
| "user_id": current_user.get('_id'), | |
| "user_email": current_user.get('email'), | |
| "timestamp": datetime.utcnow(), | |
| "patient_id": request.patient_id if hasattr(request, 'patient_id') else None, | |
| "chat_type": "text_chat" | |
| } | |
| await chats_collection.insert_one(chat_entry) | |
| logger.info(f"Chat stored in database for user {current_user['email']}") | |
| except Exception as db_error: | |
| logger.error(f"Failed to store chat in database: {str(db_error)}") | |
| # Continue even if database storage fails | |
| # Return streaming response | |
| async def generate_response(): | |
| # Simulate streaming by sending the response in chunks | |
| words = response_text.split() | |
| chunk_size = 3 # Send 3 words at a time | |
| for i in range(0, len(words), chunk_size): | |
| chunk = " ".join(words[i:i + chunk_size]) | |
| if i + chunk_size < len(words): | |
| chunk += " " # Add space if not the last chunk | |
| yield chunk | |
| await asyncio.sleep(0.1) # Small delay to simulate streaming | |
| return StreamingResponse( | |
| generate_response(), | |
| media_type="text/plain" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in TxAgent chat stream: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to process chat stream request") | |
| async def transcribe_audio( | |
| audio: UploadFile = File(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Transcription vocale avec TxAgent intégré""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use voice features") | |
| # For now, return mock transcription | |
| return { | |
| "status": "success", | |
| "transcription": "Mock voice transcription from integrated TxAgent", | |
| "mode": "integrated" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error in voice transcription: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to transcribe audio") | |
| async def synthesize_speech( | |
| request: VoiceOutputRequest, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Synthèse vocale avec TxAgent intégré""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use voice features") | |
| # For now, return mock audio data | |
| audio_data = b"Mock audio data from integrated TxAgent" | |
| return StreamingResponse( | |
| iter([audio_data]), | |
| media_type="audio/mpeg", | |
| headers={"Content-Disposition": "attachment; filename=speech.mp3"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in voice synthesis: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to synthesize speech") | |
| async def get_chats(current_user: dict = Depends(get_current_user)): | |
| """Obtient l'historique des chats""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can access chat history") | |
| # Import database collections | |
| from db.mongo import db | |
| chats_collection = db.chats | |
| # Query local database for chat history | |
| cursor = chats_collection.find().sort("timestamp", -1).limit(50) | |
| chats = await cursor.to_list(length=50) | |
| return [ | |
| { | |
| "id": str(chat["_id"]), | |
| "message": chat.get("message", ""), | |
| "response": chat.get("response", ""), | |
| "timestamp": chat.get("timestamp"), | |
| "user_id": str(chat.get("user_id", "")), | |
| "patient_id": str(chat.get("patient_id", "")) if chat.get("patient_id") else None | |
| } | |
| for chat in chats | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting chats: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get chats") | |
| async def get_patient_analysis_reports_pdf( | |
| patient_id: str = Path(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Generate PDF analysis reports for a specific patient""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can generate PDF reports") | |
| logger.info(f"Generating PDF analysis reports for patient {patient_id} by {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| analysis_collection = db.patient_analysis_results | |
| # Find analysis results for the patient | |
| analysis_results = await analysis_collection.find({"patient_id": patient_id}).to_list(length=None) | |
| if not analysis_results: | |
| raise HTTPException(status_code=404, detail="No analysis results found for this patient") | |
| # Create a simple PDF report | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| import io | |
| # Create PDF buffer | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Title | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=16, | |
| spaceAfter=30, | |
| alignment=1 # Center alignment | |
| ) | |
| story.append(Paragraph("Patient Analysis Report", title_style)) | |
| story.append(Spacer(1, 12)) | |
| # Patient Information | |
| story.append(Paragraph("Patient Information", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| # Get patient info from first analysis result | |
| first_result = analysis_results[0] | |
| patient_info = [ | |
| ["Patient ID:", patient_id], | |
| ["Analysis Date:", first_result.get('timestamp', 'N/A')], | |
| ] | |
| patient_table = Table(patient_info, colWidths=[2*inch, 4*inch]) | |
| patient_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.grey), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.whitesmoke), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica-Bold'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.beige), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(patient_table) | |
| story.append(Spacer(1, 20)) | |
| # Analysis Results | |
| story.append(Paragraph("Analysis Results", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| for i, result in enumerate(analysis_results): | |
| # Risk Assessment | |
| suicide_risk = result.get('suicide_risk', {}) | |
| risk_level = suicide_risk.get('level', 'none') if isinstance(suicide_risk, dict) else 'none' | |
| risk_score = suicide_risk.get('score', 0.0) if isinstance(suicide_risk, dict) else 0.0 | |
| risk_factors = suicide_risk.get('factors', []) if isinstance(suicide_risk, dict) else [] | |
| story.append(Paragraph(f"Analysis #{i+1}", styles['Heading3'])) | |
| story.append(Spacer(1, 6)) | |
| analysis_data = [ | |
| ["Risk Level:", risk_level.upper()], | |
| ["Risk Score:", f"{risk_score:.2f}"], | |
| ["Risk Factors:", ", ".join(risk_factors) if risk_factors else "None identified"], | |
| ["Analysis Date:", result.get('timestamp', 'N/A')], | |
| ] | |
| analysis_table = Table(analysis_data, colWidths=[2*inch, 4*inch]) | |
| analysis_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightblue), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 6), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.white), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(analysis_table) | |
| story.append(Spacer(1, 12)) | |
| # Summary if available | |
| if result.get('summary'): | |
| story.append(Paragraph("Summary:", styles['Heading4'])) | |
| story.append(Paragraph(result['summary'], styles['Normal'])) | |
| story.append(Spacer(1, 12)) | |
| # Build PDF | |
| doc.build(story) | |
| buffer.seek(0) | |
| return StreamingResponse( | |
| buffer, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename=patient_{patient_id}_analysis_reports.pdf"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report for patient {patient_id}: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to generate PDF report: {str(e)}") | |
| async def get_all_patients_analysis_reports_pdf( | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Generate PDF analysis reports for all patients""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can generate PDF reports") | |
| logger.info(f"Generating PDF analysis reports for all patients by {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| analysis_collection = db.patient_analysis_results | |
| # Find all analysis results | |
| analysis_results = await analysis_collection.find({}).to_list(length=None) | |
| if not analysis_results: | |
| raise HTTPException(status_code=404, detail="No analysis results found") | |
| # Create a simple PDF report | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, Table, TableStyle | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.units import inch | |
| from reportlab.lib import colors | |
| import io | |
| # Create PDF buffer | |
| buffer = io.BytesIO() | |
| doc = SimpleDocTemplate(buffer, pagesize=letter) | |
| styles = getSampleStyleSheet() | |
| story = [] | |
| # Title | |
| title_style = ParagraphStyle( | |
| 'CustomTitle', | |
| parent=styles['Heading1'], | |
| fontSize=16, | |
| spaceAfter=30, | |
| alignment=1 # Center alignment | |
| ) | |
| story.append(Paragraph("All Patients Analysis Reports", title_style)) | |
| story.append(Spacer(1, 12)) | |
| # Summary | |
| story.append(Paragraph("Summary", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| summary_data = [ | |
| ["Total Analysis Reports:", str(len(analysis_results))], | |
| ["Generated Date:", datetime.now().strftime("%Y-%m-%d %H:%M:%S")], | |
| ["Generated By:", current_user['email']], | |
| ] | |
| summary_table = Table(summary_data, colWidths=[2*inch, 4*inch]) | |
| summary_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.grey), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.whitesmoke), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica-Bold'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 10), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 12), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.beige), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(summary_table) | |
| story.append(Spacer(1, 20)) | |
| # Group results by patient | |
| patient_results = {} | |
| for result in analysis_results: | |
| patient_id = result.get('patient_id', 'unknown') | |
| if patient_id not in patient_results: | |
| patient_results[patient_id] = [] | |
| patient_results[patient_id].append(result) | |
| # Patient Reports | |
| for patient_id, results in patient_results.items(): | |
| story.append(Paragraph(f"Patient: {patient_id}", styles['Heading2'])) | |
| story.append(Spacer(1, 12)) | |
| for i, result in enumerate(results): | |
| # Risk Assessment | |
| suicide_risk = result.get('suicide_risk', {}) | |
| risk_level = suicide_risk.get('level', 'none') if isinstance(suicide_risk, dict) else 'none' | |
| risk_score = suicide_risk.get('score', 0.0) if isinstance(suicide_risk, dict) else 0.0 | |
| risk_factors = suicide_risk.get('factors', []) if isinstance(suicide_risk, dict) else [] | |
| story.append(Paragraph(f"Analysis #{i+1}", styles['Heading3'])) | |
| story.append(Spacer(1, 6)) | |
| analysis_data = [ | |
| ["Risk Level:", risk_level.upper()], | |
| ["Risk Score:", f"{risk_score:.2f}"], | |
| ["Risk Factors:", ", ".join(risk_factors) if risk_factors else "None identified"], | |
| ["Analysis Date:", result.get('timestamp', 'N/A')], | |
| ] | |
| analysis_table = Table(analysis_data, colWidths=[2*inch, 4*inch]) | |
| analysis_table.setStyle(TableStyle([ | |
| ('BACKGROUND', (0, 0), (0, -1), colors.lightblue), | |
| ('TEXTCOLOR', (0, 0), (0, -1), colors.black), | |
| ('ALIGN', (0, 0), (-1, -1), 'LEFT'), | |
| ('FONTNAME', (0, 0), (-1, -1), 'Helvetica'), | |
| ('FONTSIZE', (0, 0), (-1, -1), 9), | |
| ('BOTTOMPADDING', (0, 0), (-1, 0), 6), | |
| ('BACKGROUND', (1, 0), (1, -1), colors.white), | |
| ('GRID', (0, 0), (-1, -1), 1, colors.black) | |
| ])) | |
| story.append(analysis_table) | |
| story.append(Spacer(1, 12)) | |
| # Summary if available | |
| if result.get('summary'): | |
| story.append(Paragraph("Summary:", styles['Heading4'])) | |
| story.append(Paragraph(result['summary'], styles['Normal'])) | |
| story.append(Spacer(1, 12)) | |
| story.append(Spacer(1, 20)) | |
| # Build PDF | |
| doc.build(story) | |
| buffer.seek(0) | |
| return StreamingResponse( | |
| buffer, | |
| media_type="application/pdf", | |
| headers={"Content-Disposition": f"attachment; filename=all_patients_analysis_reports_{datetime.now().strftime('%Y%m%d')}.pdf"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error generating PDF report for all patients: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Failed to generate PDF report: {str(e)}") | |
| # Voice synthesis endpoint | |
| async def synthesize_voice( | |
| request: dict, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """ | |
| Convert text to speech using gTTS | |
| """ | |
| try: | |
| logger.info(f"Voice synthesis initiated by {current_user['email']}") | |
| # Extract parameters from request | |
| text = request.get('text', '') | |
| language = request.get('language', 'en-US') | |
| return_format = request.get('return_format', 'mp3') | |
| if not text: | |
| raise HTTPException(status_code=400, detail="Text is required") | |
| # Convert language code for gTTS (e.g., 'en-US' -> 'en') | |
| language_code = language.split('-')[0] if '-' in language else language | |
| # Generate speech | |
| audio_data = text_to_speech(text, language=language_code) | |
| # Return audio data | |
| return StreamingResponse( | |
| io.BytesIO(audio_data), | |
| media_type=f"audio/{return_format}", | |
| headers={"Content-Disposition": f"attachment; filename=speech.{return_format}"} | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in voice synthesis: {e}") | |
| raise HTTPException(status_code=500, detail="Error generating voice output") | |
| # Notifications endpoints | |
| async def get_notifications(current_user: dict = Depends(get_current_user)): | |
| """Get notifications for the current user""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can access notifications") | |
| logger.info(f"Fetching notifications for {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| notifications_collection = db.notifications | |
| # Get notifications for the current user | |
| notifications = await notifications_collection.find({ | |
| "user_id": current_user.get('_id') | |
| }).sort("timestamp", -1).limit(50).to_list(length=50) | |
| return [ | |
| { | |
| "id": str(notification["_id"]), | |
| "title": notification.get("title", ""), | |
| "message": notification.get("message", ""), | |
| "type": notification.get("type", "info"), | |
| "read": notification.get("read", False), | |
| "timestamp": notification.get("timestamp"), | |
| "patient_id": notification.get("patient_id") | |
| } | |
| for notification in notifications | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting notifications: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to get notifications") | |
| async def mark_notification_read( | |
| notification_id: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Mark a notification as read""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can mark notifications as read") | |
| logger.info(f"Marking notification {notification_id} as read by {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| notifications_collection = db.notifications | |
| # Update the notification | |
| result = await notifications_collection.update_one( | |
| { | |
| "_id": ObjectId(notification_id), | |
| "user_id": current_user.get('_id') | |
| }, | |
| {"$set": {"read": True, "read_at": datetime.utcnow()}} | |
| ) | |
| if result.matched_count == 0: | |
| raise HTTPException(status_code=404, detail="Notification not found") | |
| return {"message": "Notification marked as read"} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error marking notification as read: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to mark notification as read") | |
| async def mark_all_notifications_read(current_user: dict = Depends(get_current_user)): | |
| """Mark all notifications as read for the current user""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can mark notifications as read") | |
| logger.info(f"Marking all notifications as read for {current_user['email']}") | |
| # Import database collections | |
| from db.mongo import db | |
| notifications_collection = db.notifications | |
| # Update all unread notifications for the user | |
| result = await notifications_collection.update_many( | |
| { | |
| "user_id": current_user.get('_id'), | |
| "read": False | |
| }, | |
| {"$set": {"read": True, "read_at": datetime.utcnow()}} | |
| ) | |
| return { | |
| "message": f"Marked {result.modified_count} notifications as read", | |
| "modified_count": result.modified_count | |
| } | |
| except Exception as e: | |
| logger.error(f"Error marking all notifications as read: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to mark notifications as read") | |
| # Voice chat endpoint | |
| async def voice_chat( | |
| audio: UploadFile = File(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Voice chat with TxAgent""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can use voice features") | |
| logger.info(f"Voice chat initiated by {current_user['email']}") | |
| # Read audio file | |
| audio_data = await audio.read() | |
| # Transcribe audio to text | |
| try: | |
| transcription = recognize_speech(audio_data) | |
| if isinstance(transcription, dict): | |
| transcription_text = transcription.get("transcription", "") | |
| else: | |
| transcription_text = str(transcription) | |
| except Exception as e: | |
| logger.error(f"Speech recognition failed: {e}") | |
| transcription_text = "Sorry, I couldn't understand the audio." | |
| # Generate response (for now, a simple response) | |
| response_text = f"I heard you say: '{transcription_text}'. How can I help you with patient care today?" | |
| # Store voice chat in the database | |
| try: | |
| from db.mongo import db | |
| chats_collection = db.chats | |
| chat_entry = { | |
| "message": transcription_text, | |
| "response": response_text, | |
| "user_id": current_user.get('_id'), | |
| "user_email": current_user.get('email'), | |
| "timestamp": datetime.utcnow(), | |
| "chat_type": "voice_chat" | |
| } | |
| await chats_collection.insert_one(chat_entry) | |
| logger.info(f"Voice chat stored in database for user {current_user['email']}") | |
| except Exception as db_error: | |
| logger.error(f"Failed to store voice chat in database: {str(db_error)}") | |
| # Convert response to speech | |
| try: | |
| audio_response = text_to_speech(response_text, language="en") | |
| except Exception as e: | |
| logger.error(f"Text-to-speech failed: {e}") | |
| audio_response = b"Sorry, I couldn't generate audio response." | |
| return StreamingResponse( | |
| io.BytesIO(audio_response), | |
| media_type="audio/mpeg", | |
| headers={"Content-Disposition": "attachment; filename=voice_response.mp3"} | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in voice chat: {e}") | |
| raise HTTPException(status_code=500, detail="Error processing voice chat") | |
| # Report analysis endpoint | |
| async def analyze_report( | |
| file: UploadFile = File(...), | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Analyze uploaded report (PDF, DOCX, etc.)""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']): | |
| raise HTTPException(status_code=403, detail="Only doctors and admins can analyze reports") | |
| logger.info(f"Report analysis initiated by {current_user['email']}") | |
| # Read file content | |
| file_content = await file.read() | |
| file_extension = file.filename.split('.')[-1].lower() | |
| # Extract text based on file type | |
| if file_extension == 'pdf': | |
| try: | |
| text_content = extract_text_from_pdf(file_content) | |
| except Exception as e: | |
| logger.error(f"PDF text extraction failed: {e}") | |
| text_content = "Failed to extract text from PDF" | |
| elif file_extension in ['docx', 'doc']: | |
| try: | |
| if Document: | |
| doc = Document(io.BytesIO(file_content)) | |
| text_content = '\n'.join([paragraph.text for paragraph in doc.paragraphs]) | |
| else: | |
| text_content = "Document processing not available" | |
| except Exception as e: | |
| logger.error(f"DOCX text extraction failed: {e}") | |
| text_content = "Failed to extract text from document" | |
| else: | |
| text_content = "Unsupported file format" | |
| # Analyze the content (for now, return a simple analysis) | |
| analysis_result = { | |
| "file_name": file.filename, | |
| "file_type": file_extension, | |
| "extracted_text": text_content[:500] + "..." if len(text_content) > 500 else text_content, | |
| "analysis": { | |
| "summary": f"Analyzed {file.filename} containing {len(text_content)} characters", | |
| "key_findings": ["Sample finding 1", "Sample finding 2"], | |
| "recommendations": ["Sample recommendation 1", "Sample recommendation 2"] | |
| }, | |
| "timestamp": datetime.utcnow().isoformat() | |
| } | |
| return analysis_result | |
| except Exception as e: | |
| logger.error(f"Error analyzing report: {e}") | |
| raise HTTPException(status_code=500, detail="Error analyzing report") | |
| # Patient deletion endpoint | |
| async def delete_patient( | |
| patient_id: str, | |
| current_user: dict = Depends(get_current_user) | |
| ): | |
| """Delete a patient and all associated data""" | |
| try: | |
| if not any(role in current_user.get('roles', []) for role in ['admin']): | |
| raise HTTPException(status_code=403, detail="Only administrators can delete patients") | |
| logger.info(f"Patient deletion initiated by {current_user['email']} for patient {patient_id}") | |
| # Import database collections | |
| from db.mongo import db | |
| patients_collection = db.patients | |
| analysis_collection = db.patient_analysis_results | |
| chats_collection = db.chats | |
| notifications_collection = db.notifications | |
| # Find the patient first | |
| patient = await patients_collection.find_one({"fhir_id": patient_id}) | |
| if not patient: | |
| raise HTTPException(status_code=404, detail="Patient not found") | |
| # Delete all associated data | |
| try: | |
| # Delete patient | |
| await patients_collection.delete_one({"fhir_id": patient_id}) | |
| # Delete analysis results | |
| await analysis_collection.delete_many({"patient_id": patient_id}) | |
| # Delete chats related to this patient | |
| await chats_collection.delete_many({"patient_id": patient_id}) | |
| # Delete notifications related to this patient | |
| await notifications_collection.delete_many({"patient_id": patient_id}) | |
| logger.info(f"Successfully deleted patient {patient_id} and all associated data") | |
| return { | |
| "message": f"Patient {patient.get('full_name', patient_id)} and all associated data deleted successfully", | |
| "patient_id": patient_id, | |
| "deleted_at": datetime.utcnow().isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error during patient deletion: {e}") | |
| raise HTTPException(status_code=500, detail="Error deleting patient data") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error deleting patient {patient_id}: {e}") | |
| raise HTTPException(status_code=500, detail="Failed to delete patient") | |