Spaces:
Sleeping
Sleeping
| import re | |
| import hashlib | |
| import io | |
| import json | |
| from datetime import datetime | |
| from typing import Dict, List, Tuple | |
| from bson import ObjectId | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| def clean_text_response(text: str) -> str: | |
| """Clean and format text response""" | |
| text = re.sub(r'\n\s*\n', '\n\n', text) | |
| text = re.sub(r'[ ]+', ' ', text) | |
| return text.replace("**", "").replace("__", "").strip() | |
| def extract_section(text: str, heading: str) -> str: | |
| """Extract a section from text based on heading""" | |
| try: | |
| pattern = rf"{re.escape(heading)}:\s*\n(.*?)(?=\n[A-Z][^\n]*:|\Z)" | |
| match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) | |
| return match.group(1).strip() if match else "" | |
| except Exception as e: | |
| logger.error(f"Section extraction failed for heading '{heading}': {e}") | |
| return "" | |
| def structure_medical_response(text: str) -> Dict: | |
| """Structure medical response into sections""" | |
| def extract_improved(text: str, heading: str) -> str: | |
| patterns = [ | |
| rf"{re.escape(heading)}:\s*\n(.*?)(?=\n\s*\n|\Z)", | |
| rf"\*\*{re.escape(heading)}\*\*:\s*\n(.*?)(?=\n\s*\n|\Z)", | |
| rf"{re.escape(heading)}[\s\-]+(.*?)(?=\n\s*\n|\Z)", | |
| rf"\n{re.escape(heading)}\s*\n(.*?)(?=\n\s*\n|\Z)" | |
| ] | |
| for pattern in patterns: | |
| match = re.search(pattern, text, re.DOTALL | re.IGNORECASE) | |
| if match: | |
| content = match.group(1).strip() | |
| content = re.sub(r'^\s*[\-\*]\s*', '', content, flags=re.MULTILINE) | |
| return content | |
| return "" | |
| text = text.replace('**', '').replace('__', '') | |
| return { | |
| "summary": extract_improved(text, "Summary of Patient's Medical History") or | |
| extract_improved(text, "Summarize the patient's medical history"), | |
| "risks": extract_improved(text, "Identify Risks or Red Flags") or | |
| extract_improved(text, "Risks or Red Flags"), | |
| "missed_issues": extract_improved(text, "Missed Diagnoses or Treatments") or | |
| extract_improved(text, "What the doctor might have missed"), | |
| "recommendations": extract_improved(text, "Suggest Next Clinical Steps") or | |
| extract_improved(text, "Suggested Clinical Actions") | |
| } | |
| def serialize_patient(patient: dict) -> dict: | |
| """Serialize patient data for JSON response""" | |
| patient_copy = patient.copy() | |
| if "_id" in patient_copy: | |
| patient_copy["_id"] = str(patient_copy["_id"]) | |
| return patient_copy | |
| def compute_patient_data_hash(data: dict) -> str: | |
| """Compute hash of patient data for change detection""" | |
| # Custom JSON encoder to handle datetime objects | |
| class DateTimeEncoder(json.JSONEncoder): | |
| def default(self, obj): | |
| if isinstance(obj, datetime): | |
| return obj.isoformat() | |
| elif isinstance(obj, ObjectId): | |
| return str(obj) | |
| return super().default(obj) | |
| serialized = json.dumps(data, sort_keys=True, cls=DateTimeEncoder) | |
| return hashlib.sha256(serialized.encode()).hexdigest() | |
| def compute_file_content_hash(file_content: bytes) -> str: | |
| """Compute hash of file content""" | |
| return hashlib.sha256(file_content).hexdigest() | |
| def create_notification(user_id: str, title: str, message: str, notification_type: str = "info", patient_id: str = None) -> dict: | |
| """Create a notification object""" | |
| return { | |
| "user_id": user_id, | |
| "title": title, | |
| "message": message, | |
| "type": notification_type, | |
| "read": False, | |
| "timestamp": datetime.utcnow(), | |
| "patient_id": patient_id | |
| } | |
| def format_risk_level(risk_level: str) -> str: | |
| """Normalize risk level names""" | |
| risk_level_mapping = { | |
| 'low': 'low', | |
| 'medium': 'moderate', | |
| 'moderate': 'moderate', | |
| 'high': 'high', | |
| 'severe': 'severe', | |
| 'critical': 'severe', | |
| 'none': 'none', | |
| 'unknown': 'none' | |
| } | |
| return risk_level_mapping.get(risk_level.lower(), 'none') |