cps-api-tx / utils.py
Ali2206's picture
teest
6ed180c
import re
import hashlib
import io
import json
from datetime import datetime
from typing import Dict, List, Tuple
from bson import ObjectId
import logging
logger = logging.getLogger(__name__)
def clean_text_response(text: str) -> str:
"""Clean and format text response"""
text = re.sub(r'\n\s*\n', '\n\n', text)
text = re.sub(r'[ ]+', ' ', text)
return text.replace("**", "").replace("__", "").strip()
def extract_section(text: str, heading: str) -> str:
"""Extract a section from text based on heading"""
try:
pattern = rf"{re.escape(heading)}:\s*\n(.*?)(?=\n[A-Z][^\n]*:|\Z)"
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
return match.group(1).strip() if match else ""
except Exception as e:
logger.error(f"Section extraction failed for heading '{heading}': {e}")
return ""
def structure_medical_response(text: str) -> Dict:
"""Structure medical response into sections"""
def extract_improved(text: str, heading: str) -> str:
patterns = [
rf"{re.escape(heading)}:\s*\n(.*?)(?=\n\s*\n|\Z)",
rf"\*\*{re.escape(heading)}\*\*:\s*\n(.*?)(?=\n\s*\n|\Z)",
rf"{re.escape(heading)}[\s\-]+(.*?)(?=\n\s*\n|\Z)",
rf"\n{re.escape(heading)}\s*\n(.*?)(?=\n\s*\n|\Z)"
]
for pattern in patterns:
match = re.search(pattern, text, re.DOTALL | re.IGNORECASE)
if match:
content = match.group(1).strip()
content = re.sub(r'^\s*[\-\*]\s*', '', content, flags=re.MULTILINE)
return content
return ""
text = text.replace('**', '').replace('__', '')
return {
"summary": extract_improved(text, "Summary of Patient's Medical History") or
extract_improved(text, "Summarize the patient's medical history"),
"risks": extract_improved(text, "Identify Risks or Red Flags") or
extract_improved(text, "Risks or Red Flags"),
"missed_issues": extract_improved(text, "Missed Diagnoses or Treatments") or
extract_improved(text, "What the doctor might have missed"),
"recommendations": extract_improved(text, "Suggest Next Clinical Steps") or
extract_improved(text, "Suggested Clinical Actions")
}
def serialize_patient(patient: dict) -> dict:
"""Serialize patient data for JSON response"""
patient_copy = patient.copy()
if "_id" in patient_copy:
patient_copy["_id"] = str(patient_copy["_id"])
return patient_copy
def compute_patient_data_hash(data: dict) -> str:
"""Compute hash of patient data for change detection"""
# Custom JSON encoder to handle datetime objects
class DateTimeEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, datetime):
return obj.isoformat()
elif isinstance(obj, ObjectId):
return str(obj)
return super().default(obj)
serialized = json.dumps(data, sort_keys=True, cls=DateTimeEncoder)
return hashlib.sha256(serialized.encode()).hexdigest()
def compute_file_content_hash(file_content: bytes) -> str:
"""Compute hash of file content"""
return hashlib.sha256(file_content).hexdigest()
def create_notification(user_id: str, title: str, message: str, notification_type: str = "info", patient_id: str = None) -> dict:
"""Create a notification object"""
return {
"user_id": user_id,
"title": title,
"message": message,
"type": notification_type,
"read": False,
"timestamp": datetime.utcnow(),
"patient_id": patient_id
}
def format_risk_level(risk_level: str) -> str:
"""Normalize risk level names"""
risk_level_mapping = {
'low': 'low',
'medium': 'moderate',
'moderate': 'moderate',
'high': 'high',
'severe': 'severe',
'critical': 'severe',
'none': 'none',
'unknown': 'none'
}
return risk_level_mapping.get(risk_level.lower(), 'none')