cps-api-tx / api /__init__.py
Ali2206's picture
teest
6ed180c
from fastapi import APIRouter
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
def get_router():
router = APIRouter()
# Import sub-modules from the routes directory
from .routes.auth import auth
from .routes.patients import patients
from .routes.pdf import pdf
from .routes.appointments import router as appointments
from .routes.messaging import router as messaging
# from .routes.txagent import router as txagent, get_txagent, ChatRequest # Commented out - using separate TxAgent API
from fastapi import Depends, HTTPException, UploadFile, File
from core.security import get_current_user
import asyncio
import io
from datetime import datetime
from fastapi.responses import StreamingResponse
# Include sub-routers with correct prefixes
router.include_router(patients, tags=["patients"]) # Remove prefix since routes already have /patients
router.include_router(auth, prefix="/auth", tags=["auth"])
router.include_router(pdf, prefix="/patients", tags=["pdf"]) # Keep prefix for PDF routes
router.include_router(appointments, prefix="/appointments", tags=["appointments"])
router.include_router(messaging, tags=["messaging"])
# router.include_router(txagent, tags=["txagent"]) # Commented out - using separate TxAgent API
# TxAgent endpoints commented out - using separate TxAgent API
# @router.post("/chat-stream")
# async def chat_stream(
# request: ChatRequest,
# current_user: dict = Depends(get_current_user)
# ):
# """Streaming chat endpoint for TxAgent"""
# import logging
# logger = logging.getLogger(__name__)
#
# try:
# if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']):
# raise HTTPException(status_code=403, detail="Only doctors and admins can use TxAgent")
# # Get TxAgent instance
# txagent = get_txagent()
#
# if txagent is None:
# # Fallback response if TxAgent is not available
# response_text = f"TxAgent is currently unavailable. Please try again later. Your message was: {request.message}"
# else:
# # Get real response from TxAgent
# try:
# response_text = txagent.chat(
# message=request.message,
# temperature=request.temperature,
# max_new_tokens=request.max_new_tokens
# )
# except Exception as e:
# logger.error(f"TxAgent chat failed: {e}")
# response_text = f"Sorry, I encountered an error processing your request: {request.message}. Please try again."
# # Return streaming text without data: prefix
# async def generate():
# # Send the complete response as plain text
# yield response_text
# return StreamingResponse(
# generate(),
# media_type="text/plain",
# headers={
# "Cache-Control": "no-cache",
# "Connection": "keep-alive"
# }
# )
# except Exception as e:
# logger.error(f"Error in chat stream: {e}")
# raise HTTPException(status_code=500, detail="Failed to process chat stream")
# @router.post("/voice/synthesize")
# async def voice_synthesize_root(
# request: dict,
# current_user: dict = Depends(get_current_user)
# ):
# """Voice synthesis endpoint at root level for frontend compatibility"""
# try:
# if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']):
# raise HTTPException(status_code=403, detail="Only doctors and admins can use voice features")
#
# logger.info(f"Voice synthesis initiated by {current_user['email']}")
#
# # Extract parameters from request
# text = request.get('text', '')
# language = request.get('language', 'en-US')
# return_format = request.get('return_format', 'mp3')
#
# if not text:
# raise HTTPException(status_code=400, detail="Text is required")
#
# # Import voice function
# try:
# from voice import text_to_speech
# except ImportError:
# # Fallback if voice module is not available
# raise HTTPException(status_code=500, detail="Voice synthesis not available")
#
# # Convert language code for gTTS (e.g., 'en-US' -> 'en')
# language_code = language.split('-')[0] if '-' in language else language
#
# # Generate speech
# audio_data = text_to_speech(text, language=language_code)
#
# # Return audio data
# return StreamingResponse(
# io.BytesIO(audio_data),
# media_type=f"audio/{return_format}",
# headers={"Content-Disposition": f"attachment; filename=speech.{return_format}"}
# )
#
# except HTTPException:
# raise
# except Exception as e:
# logger.error(f"Error in voice synthesis: {e}")
# raise HTTPException(status_code=500, detail="Error generating voice output")
# @router.post("/analyze-report")
# async def analyze_report_root(
# file: UploadFile = File(...),
# current_user: dict = Depends(get_current_user)
# ):
# """Analyze uploaded report (PDF, DOCX, etc.) at root level"""
# try:
# if not any(role in current_user.get('roles', []) for role in ['doctor', 'admin']):
# raise HTTPException(status_code=403, detail="Only doctors and admins can analyze reports")
#
# logger.info(f"Report analysis initiated by {current_user['email']}")
#
# # Read file content
# file_content = await file.read()
# file_extension = file.filename.split('.')[-1].lower()
#
# # Extract text based on file type
# if file_extension == 'pdf':
# try:
# from voice import extract_text_from_pdf
# text_content = extract_text_from_pdf(file_content)
# except Exception as e:
# logger.error(f"PDF text extraction failed: {e}")
# text_content = "Failed to extract text from PDF"
# elif file_extension in ['docx', 'doc']:
# try:
# from docx import Document
# if Document:
# doc = Document(io.BytesIO(file_content))
# text_content = '\n'.join([paragraph.text for paragraph in doc.paragraphs])
# else:
# text_content = "Document processing not available"
# except Exception as e:
# logger.error(f"DOCX text extraction failed: {e}")
# text_content = "Failed to extract text from document"
# else:
# text_content = "Unsupported file format"
#
# # Analyze the content (for now, return a simple analysis)
# analysis_result = {
# "file_name": file.filename,
# "file_type": file_extension,
# "extracted_text": text_content[:500] + "..." if len(text_content) > 500 else text_content,
# "analysis": {
# "summary": f"Analyzed {file.filename} containing {len(text_content)} characters",
# "key_findings": ["Sample finding 1", "Sample finding 2"],
# "recommendations": ["Sample recommendation 1", "Sample recommendation 2"]
# },
# "timestamp": datetime.utcnow().isoformat()
# }
#
# return analysis_result
#
# except Exception as e:
# logger.error(f"Error analyzing report: {e}")
# raise HTTPException(status_code=500, detail="Error analyzing report")
return router
# Export the router
api_router = get_router()