Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import subprocess | |
| import tempfile | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any | |
| import aiofiles | |
| import aiohttp | |
| from fastapi import HTTPException, status | |
| from bson import ObjectId | |
| from db.mongo import patients_collection | |
| logger = logging.getLogger(__name__) | |
| class SyntheaIntegrationService: | |
| """ | |
| Service for integrating Synthea FHIR data generation with the CPS application | |
| """ | |
| def __init__(self): | |
| # Try to make Synthea work in all environments, including Hugging Face Spaces | |
| import tempfile | |
| import os | |
| # Check if we're in a containerized environment (like Hugging Face Spaces) | |
| self.is_containerized = os.path.exists('/.dockerenv') or os.environ.get('HF_SPACE_ID') is not None | |
| # Check if Java is available locally | |
| self.java_available = self._check_java_availability() | |
| # Always use real Synthea data - no fallback to mock data | |
| self.use_mock_data = False | |
| # Try multiple directory locations for better compatibility | |
| possible_dirs = [ | |
| Path('/tmp'), # Standard temp directory | |
| Path('/app/tmp'), # App-specific temp | |
| Path.cwd() / "tmp", # Current working directory | |
| Path(tempfile.gettempdir()) # System temp directory | |
| ] | |
| # Find a writable directory | |
| base_temp_dir = None | |
| for dir_path in possible_dirs: | |
| try: | |
| if os.access(dir_path, os.W_OK): | |
| base_temp_dir = dir_path | |
| break | |
| except Exception: | |
| continue | |
| if base_temp_dir is None: | |
| # Fallback to current directory | |
| base_temp_dir = Path.cwd() | |
| logger.warning("⚠️ No writable temp directory found, using current directory") | |
| self.synthea_dir = base_temp_dir / "cps_synthea" | |
| # Use the actual output directory where Synthea creates files | |
| self.output_dir = Path.cwd() / "output" / "fhir" | |
| self.synthea_jar_path = self.synthea_dir / "synthea-with-dependencies.jar" | |
| # Try to create directories | |
| try: | |
| self.synthea_dir.mkdir(exist_ok=True) | |
| self.output_dir.mkdir(parents=True, exist_ok=True) | |
| logger.info(f"✅ Using directories: synthea={self.synthea_dir}, output={self.output_dir}") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Could not create directories: {e}, will try to use existing paths") | |
| # Log the configuration | |
| logger.info("🚀 Using real Synthea generation (no fallback to mock data)") | |
| # Synthea configuration | |
| self.default_config = { | |
| "population": 10, | |
| "seed": 42, | |
| "age_min": 18, | |
| "age_max": 80, | |
| "gender": "both", # male, female, both | |
| "location": "Massachusetts", # Default location | |
| "modules": ["*"], # All modules | |
| "exporter": "fhir", | |
| "exporter.fhir.transaction_bundle": "true", | |
| "exporter.fhir.include_patient_summary": "true", | |
| "exporter.fhir.include_encounters": "true", | |
| "exporter.fhir.include_medications": "true", | |
| "exporter.fhir.include_conditions": "true", | |
| "exporter.fhir.include_observations": "true", | |
| "exporter.fhir.include_procedures": "true", | |
| "exporter.fhir.include_immunizations": "true", | |
| "exporter.fhir.include_allergies": "true", | |
| "exporter.fhir.include_organizations": "false", | |
| "exporter.fhir.include_practitioners": "false" | |
| } | |
| def _check_java_availability(self) -> bool: | |
| """ | |
| Check if Java is available in the system | |
| """ | |
| try: | |
| import subprocess | |
| result = subprocess.run(['java', '-version'], | |
| capture_output=True, | |
| text=True, | |
| timeout=10) | |
| return result.returncode == 0 | |
| except (subprocess.TimeoutExpired, FileNotFoundError, Exception): | |
| return False | |
| async def download_synthea(self) -> bool: | |
| """ | |
| Download Synthea JAR file if not present | |
| """ | |
| if self.synthea_jar_path.exists(): | |
| logger.info("✅ Synthea JAR already exists") | |
| return True | |
| try: | |
| logger.info("📥 Downloading Synthea...") | |
| synthea_url = "https://github.com/synthetichealth/synthea/releases/download/master-branch-latest/synthea-with-dependencies.jar" | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(synthea_url) as response: | |
| if response.status == 200: | |
| content = await response.read() | |
| async with aiofiles.open(self.synthea_jar_path, 'wb') as f: | |
| await f.write(content) | |
| logger.info("✅ Synthea downloaded successfully") | |
| return True | |
| else: | |
| logger.error(f"❌ Failed to download Synthea: {response.status}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error downloading Synthea: {str(e)}") | |
| return False | |
| async def generate_synthea_config(self, config_overrides: Dict[str, Any] = None) -> str: | |
| """ | |
| Generate Synthea configuration file | |
| """ | |
| config = self.default_config.copy() | |
| if config_overrides: | |
| config.update(config_overrides) | |
| config_content = "\n".join([f"{k} = {v}" for k, v in config.items()]) | |
| config_file = self.synthea_dir / "synthea.properties" | |
| async with aiofiles.open(config_file, 'w') as f: | |
| await f.write(config_content) | |
| return str(config_file) | |
| async def run_synthea_generation(self, config_file: str, population: int = 10) -> bool: | |
| """ | |
| Run Synthea to generate FHIR data | |
| """ | |
| try: | |
| logger.info("🚀 Starting Synthea generation...") | |
| # Check if Java is available | |
| try: | |
| java_check = await asyncio.create_subprocess_exec( | |
| "java", "-version", | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| await java_check.communicate() | |
| if java_check.returncode != 0: | |
| logger.error("❌ Java is not available in the environment") | |
| return False | |
| except FileNotFoundError: | |
| logger.error("❌ Java is not installed or not in PATH") | |
| return False | |
| # Clear output directory | |
| for file in self.output_dir.glob("*.json"): | |
| file.unlink() | |
| # Run Synthea with command line arguments (more reliable) | |
| # Use the parent directory of output/fhir as the output directory | |
| output_parent = Path.cwd() / "output" | |
| cmd = [ | |
| "java", "-jar", str(self.synthea_jar_path), | |
| "-p", str(population), | |
| "-o", str(output_parent.absolute()), | |
| "--seed", str(int(datetime.now().timestamp())), | |
| "--exporter.fhir.transaction_bundle=true", | |
| "--exporter.fhir.include_patient_summary=true", | |
| "--exporter.fhir.include_encounters=true", | |
| "--exporter.fhir.include_medications=true", | |
| "--exporter.fhir.include_conditions=true", | |
| "--exporter.fhir.include_observations=true", | |
| "--exporter.fhir.include_organizations=false", | |
| "--exporter.fhir.include_practitioners=false" | |
| ] | |
| logger.info(f"Running command: {' '.join(cmd)}") | |
| logger.info(f"Current working directory: {Path.cwd()}") | |
| logger.info(f"Output directory (absolute): {self.output_dir.absolute()}") | |
| logger.info(f"Output directory exists before generation: {self.output_dir.exists()}") | |
| # Try multiple working directories for better compatibility | |
| working_dirs = [str(Path.cwd()), str(self.synthea_dir), str(self.output_dir)] | |
| process = None | |
| for working_dir in working_dirs: | |
| try: | |
| logger.info(f"🔄 Trying to run Synthea with working directory: {working_dir}") | |
| process = await asyncio.create_subprocess_exec( | |
| *cmd, | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE, | |
| cwd=working_dir | |
| ) | |
| break | |
| except Exception as e: | |
| logger.warning(f"⚠️ Failed to run with working directory {working_dir}: {e}") | |
| continue | |
| if process is None: | |
| logger.error("❌ Failed to create Synthea subprocess with any working directory") | |
| return False | |
| stdout, stderr = await process.communicate() | |
| if process.returncode == 0: | |
| logger.info("✅ Synthea generation completed successfully") | |
| # Handle potential encoding issues with subprocess output | |
| try: | |
| output_text = stdout.decode('utf-8', errors='ignore') | |
| logger.info(f"Output: {output_text}") | |
| except Exception as decode_error: | |
| logger.warning(f"⚠️ Could not decode stdout: {decode_error}") | |
| logger.info("✅ Synthea generation completed successfully (output not displayed due to encoding)") | |
| # Debug: Check what files were actually created | |
| logger.info(f"🔍 Checking output directory immediately after generation: {self.output_dir}") | |
| if self.output_dir.exists(): | |
| all_files = list(self.output_dir.glob("*")) | |
| logger.info(f"📁 Files in output directory: {[f.name for f in all_files]}") | |
| # Check for subdirectories | |
| subdirs = [d for d in self.output_dir.iterdir() if d.is_dir()] | |
| logger.info(f"📁 Subdirectories in output directory: {[d.name for d in subdirs]}") | |
| # Check each subdirectory for JSON files | |
| for subdir in subdirs: | |
| json_files = list(subdir.glob("*.json")) | |
| logger.info(f"📁 JSON files in {subdir.name}: {[f.name for f in json_files]}") | |
| # Specifically check for fhir subdirectory | |
| fhir_dir = self.output_dir / "fhir" | |
| if fhir_dir.exists(): | |
| fhir_files = list(fhir_dir.glob("*.json")) | |
| logger.info(f"📁 JSON files in fhir subdirectory: {[f.name for f in fhir_files]}") | |
| else: | |
| logger.warning(f"⚠️ FHIR subdirectory does not exist: {fhir_dir}") | |
| # Also check if files were created in the working directory | |
| working_dir_files = list(Path.cwd().glob("*.json")) | |
| logger.info(f"📁 JSON files in current working directory: {[f.name for f in working_dir_files]}") | |
| # Check if files were created in the synthea directory | |
| synthea_dir_files = list(self.synthea_dir.glob("*.json")) | |
| logger.info(f"📁 JSON files in synthea directory: {[f.name for f in synthea_dir_files]}") | |
| # Check what files were created in the working directory where Synthea actually ran | |
| for working_dir in working_dirs: | |
| if Path(working_dir).exists(): | |
| working_dir_files = list(Path(working_dir).glob("*.json")) | |
| logger.info(f"📁 JSON files in working directory {working_dir}: {[f.name for f in working_dir_files]}") | |
| # Also check subdirectories in the working directory | |
| for subdir in Path(working_dir).iterdir(): | |
| if subdir.is_dir(): | |
| subdir_files = list(subdir.glob("*.json")) | |
| if subdir_files: | |
| logger.info(f"📁 JSON files in subdirectory {subdir}: {[f.name for f in subdir_files]}") | |
| else: | |
| logger.warning(f"⚠️ Output directory does not exist: {self.output_dir}") | |
| return True | |
| else: | |
| # Handle potential encoding issues with stderr | |
| try: | |
| error_output = stderr.decode('utf-8', errors='ignore') | |
| except Exception as decode_error: | |
| error_output = f"Could not decode error output: {decode_error}" | |
| logger.error(f"❌ Synthea generation failed with return code {process.returncode}") | |
| logger.error(f"Error output: {error_output}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Error running Synthea: {str(e)}") | |
| return False | |
| async def process_synthea_output(self, require_medical_data: bool = False) -> List[Dict[str, Any]]: | |
| """ | |
| Process Synthea output files and convert to application format | |
| """ | |
| patients = [] | |
| try: | |
| # More comprehensive file discovery | |
| patient_files = [] | |
| # List of directories to search for Synthea output | |
| search_dirs = [ | |
| self.output_dir, # The actual fhir directory where files are created | |
| Path.cwd() / "output" / "fhir", # Explicit path to fhir directory | |
| Path.cwd() / "output", # Parent output directory | |
| Path.cwd(), # Current working directory | |
| Path('/tmp'), | |
| Path('/app'), | |
| Path('/app/tmp') | |
| ] | |
| logger.info(f"🔍 Searching for Synthea output files in multiple directories...") | |
| for search_dir in search_dirs: | |
| if not search_dir.exists(): | |
| continue | |
| logger.info(f"🔍 Searching in: {search_dir}") | |
| try: | |
| # Check for JSON files in the directory | |
| json_files = list(search_dir.glob("*.json")) | |
| logger.info(f"📁 Found {len(json_files)} JSON files in {search_dir}") | |
| # Filter out hospital and practitioner files | |
| patient_files_in_dir = [ | |
| f for f in json_files | |
| if not any(x in f.name for x in ["hospitalInformation", "practitionerInformation"]) | |
| ] | |
| patient_files.extend(patient_files_in_dir) | |
| # Also check subdirectories | |
| subdirs = [d for d in search_dir.iterdir() if d.is_dir()] | |
| for subdir in subdirs: | |
| try: | |
| subdir_json_files = list(subdir.glob("*.json")) | |
| subdir_patient_files = [ | |
| f for f in subdir_json_files | |
| if not any(x in f.name for x in ["hospitalInformation", "practitionerInformation"]) | |
| ] | |
| patient_files.extend(subdir_patient_files) | |
| logger.info(f"📁 Found {len(subdir_patient_files)} patient files in subdirectory {subdir}") | |
| except Exception as subdir_error: | |
| logger.warning(f"⚠️ Error accessing subdirectory {subdir}: {subdir_error}") | |
| except Exception as dir_error: | |
| logger.warning(f"⚠️ Error accessing directory {search_dir}: {dir_error}") | |
| continue | |
| # Remove duplicates | |
| patient_files = list(set(patient_files)) | |
| logger.info(f"📁 Total patient files found: {len(patient_files)}") | |
| logger.info(f"📁 Patient file names: {[f.name for f in patient_files]}") | |
| if not patient_files: | |
| logger.error("❌ No patient files found in any directory") | |
| return [] | |
| # Process each patient file | |
| valid_patients = 0 | |
| invalid_patients = 0 | |
| for file_path in patient_files: | |
| try: | |
| logger.info(f"📄 Processing file: {file_path}") | |
| async with aiofiles.open(file_path, 'r') as f: | |
| content = await f.read() | |
| bundle = json.loads(content) | |
| patient_data = await self._extract_patient_data(bundle, file_path.name) | |
| if patient_data: | |
| # Validate patient data completeness | |
| if self._validate_patient_data_completeness(patient_data, require_medical_data): | |
| patients.append(patient_data) | |
| valid_patients += 1 | |
| logger.info(f"✅ Validated and extracted patient: {patient_data.get('full_name', 'Unknown')}") | |
| else: | |
| invalid_patients += 1 | |
| logger.warning(f"❌ Patient validation failed: {patient_data.get('full_name', 'Unknown')}") | |
| else: | |
| logger.warning(f"⚠️ No patient data extracted from {file_path}") | |
| except Exception as e: | |
| logger.error(f"❌ Error processing {file_path}: {str(e)}") | |
| continue | |
| logger.info(f"📊 Patient validation summary: {valid_patients} valid, {invalid_patients} invalid") | |
| logger.info(f"✅ Successfully processed {len(patients)} patients from Synthea output") | |
| return patients | |
| except Exception as e: | |
| logger.error(f"❌ Error processing Synthea output: {str(e)}") | |
| return [] | |
| async def _extract_patient_data(self, bundle: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Extract patient data from FHIR bundle | |
| """ | |
| try: | |
| patient_data = {} | |
| conditions = [] | |
| medications = [] | |
| encounters = [] | |
| observations = [] | |
| procedures = [] | |
| immunizations = [] | |
| allergies = [] | |
| if 'entry' not in bundle: | |
| logger.warning(f"No entries found in bundle: {filename}") | |
| return None | |
| for entry in bundle.get('entry', []): | |
| resource = entry.get('resource', {}) | |
| resource_type = resource.get('resourceType') | |
| if resource_type == 'Patient': | |
| # Extract patient demographics | |
| name = resource.get('name', [{}])[0] if resource.get('name') else {} | |
| address = resource.get('address', [{}])[0] if resource.get('address') else {} | |
| patient_data = { | |
| 'fhir_id': resource.get('id'), | |
| 'full_name': f"{' '.join(name.get('given', []))} {name.get('family', '')}".strip(), | |
| 'gender': resource.get('gender', 'unknown'), | |
| 'date_of_birth': resource.get('birthDate'), | |
| 'address': ' '.join(address.get('line', [])) if address.get('line') else '', | |
| 'city': address.get('city', ''), | |
| 'state': address.get('state', ''), | |
| 'postal_code': address.get('postalCode', ''), | |
| 'country': address.get('country', ''), | |
| 'marital_status': resource.get('maritalStatus', {}).get('text', ''), | |
| 'language': self._extract_language(resource), | |
| 'source': 'synthea', | |
| 'import_date': datetime.utcnow().isoformat(), | |
| 'last_updated': datetime.utcnow().isoformat() | |
| } | |
| elif resource_type == 'Condition': | |
| conditions.append({ | |
| 'id': resource.get('id'), | |
| 'code': resource.get('code', {}).get('text', ''), | |
| 'status': resource.get('clinicalStatus', {}).get('text', ''), | |
| 'onset_date': resource.get('onsetDateTime'), | |
| 'recorded_date': resource.get('recordedDate'), | |
| 'verification_status': resource.get('verificationStatus', {}).get('text', ''), | |
| 'category': resource.get('category', [{}])[0].get('text', '') if resource.get('category') else '' | |
| }) | |
| elif resource_type == 'MedicationRequest': | |
| medications.append({ | |
| 'id': resource.get('id'), | |
| 'name': resource.get('medicationCodeableConcept', {}).get('text', ''), | |
| 'status': resource.get('status'), | |
| 'prescribed_date': resource.get('authoredOn'), | |
| 'requester': resource.get('requester', {}).get('display', ''), | |
| 'dosage': self._extract_dosage(resource), | |
| 'intent': resource.get('intent', ''), | |
| 'priority': resource.get('priority', '') | |
| }) | |
| elif resource_type == 'Encounter': | |
| encounters.append({ | |
| 'id': resource.get('id'), | |
| 'type': resource.get('type', [{}])[0].get('text', '') if resource.get('type') else '', | |
| 'status': resource.get('status'), | |
| 'period': resource.get('period', {}), | |
| 'service_provider': resource.get('serviceProvider', {}).get('display', ''), | |
| 'class': resource.get('class', {}).get('code', ''), | |
| 'reason': resource.get('reasonCode', [{}])[0].get('text', '') if resource.get('reasonCode') else '' | |
| }) | |
| elif resource_type == 'Observation': | |
| observations.append({ | |
| 'id': resource.get('id'), | |
| 'code': resource.get('code', {}).get('text', ''), | |
| 'value': self._extract_observation_value(resource), | |
| 'unit': resource.get('valueQuantity', {}).get('unit', ''), | |
| 'status': resource.get('status'), | |
| 'effective_date': resource.get('effectiveDateTime'), | |
| 'category': resource.get('category', [{}])[0].get('text', '') if resource.get('category') else '' | |
| }) | |
| elif resource_type == 'Procedure': | |
| procedures.append({ | |
| 'id': resource.get('id'), | |
| 'code': resource.get('code', {}).get('text', ''), | |
| 'status': resource.get('status'), | |
| 'performed_date': resource.get('performedDateTime'), | |
| 'performer': resource.get('performer', [{}])[0].get('actor', {}).get('display', '') if resource.get('performer') else '', | |
| 'reason': resource.get('reasonCode', [{}])[0].get('text', '') if resource.get('reasonCode') else '' | |
| }) | |
| elif resource_type == 'Immunization': | |
| immunizations.append({ | |
| 'id': resource.get('id'), | |
| 'vaccine': resource.get('vaccineCode', {}).get('text', ''), | |
| 'status': resource.get('status'), | |
| 'date': resource.get('occurrenceDateTime'), | |
| 'lot_number': resource.get('lotNumber', ''), | |
| 'expiration_date': resource.get('expirationDate', ''), | |
| 'manufacturer': resource.get('manufacturer', {}).get('display', '') | |
| }) | |
| elif resource_type == 'AllergyIntolerance': | |
| allergies.append({ | |
| 'id': resource.get('id'), | |
| 'substance': resource.get('code', {}).get('text', ''), | |
| 'status': resource.get('clinicalStatus', {}).get('text', ''), | |
| 'verification_status': resource.get('verificationStatus', {}).get('text', ''), | |
| 'type': resource.get('type', ''), | |
| 'category': resource.get('category', []), | |
| 'reaction': self._extract_allergy_reactions(resource) | |
| }) | |
| if patient_data: | |
| patient_data.update({ | |
| 'conditions': conditions, | |
| 'medications': medications, | |
| 'encounters': encounters, | |
| 'observations': observations, | |
| 'procedures': procedures, | |
| 'immunizations': immunizations, | |
| 'allergies': allergies | |
| }) | |
| return patient_data | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ Error extracting patient data from {filename}: {str(e)}") | |
| return None | |
| def _extract_language(self, patient_resource: Dict[str, Any]) -> str: | |
| """Extract language from patient resource""" | |
| try: | |
| communication = patient_resource.get('communication', []) | |
| if communication and communication[0].get('language'): | |
| return communication[0]['language'].get('text', 'English') | |
| return 'English' | |
| except: | |
| return 'English' | |
| def _extract_dosage(self, medication_resource: Dict[str, Any]) -> str: | |
| """Extract dosage information from medication resource""" | |
| try: | |
| dosage = medication_resource.get('dosageInstruction', []) | |
| if dosage and dosage[0].get('text'): | |
| return dosage[0]['text'] | |
| return '' | |
| except: | |
| return '' | |
| def _extract_observation_value(self, observation_resource: Dict[str, Any]) -> str: | |
| """Extract observation value""" | |
| try: | |
| if 'valueQuantity' in observation_resource: | |
| value = observation_resource['valueQuantity'].get('value', '') | |
| unit = observation_resource['valueQuantity'].get('unit', '') | |
| return f"{value} {unit}".strip() | |
| elif 'valueCodeableConcept' in observation_resource: | |
| return observation_resource['valueCodeableConcept'].get('text', '') | |
| elif 'valueString' in observation_resource: | |
| return observation_resource['valueString'] | |
| return '' | |
| except: | |
| return '' | |
| def _extract_allergy_reactions(self, allergy_resource: Dict[str, Any]) -> List[Dict[str, Any]]: | |
| """Extract allergy reactions""" | |
| try: | |
| reactions = allergy_resource.get('reaction', []) | |
| return [{ | |
| 'manifestation': reaction.get('manifestation', [{}])[0].get('text', '') if reaction.get('manifestation') else '', | |
| 'severity': reaction.get('severity', ''), | |
| 'onset': reaction.get('onset', '') | |
| } for reaction in reactions] | |
| except: | |
| return [] | |
| def _validate_patient_data_completeness(self, patient_data: Dict[str, Any], require_medical_data: bool = True) -> bool: | |
| """ | |
| Validate that a patient has all required data fields | |
| Args: | |
| patient_data: The patient data to validate | |
| require_medical_data: If True, patient must have at least some medical data (default: True for complete data) | |
| """ | |
| try: | |
| # Required basic fields - all must be present and not empty | |
| required_fields = [ | |
| 'full_name', 'gender', 'date_of_birth', 'address', | |
| 'city', 'state', 'postal_code', 'country' | |
| ] | |
| # Check if all required fields are present and not empty | |
| for field in required_fields: | |
| value = patient_data.get(field) | |
| if not value or (isinstance(value, str) and not value.strip()): | |
| logger.warning(f"⚠️ Missing or empty required field '{field}' for patient {patient_data.get('full_name', 'Unknown')}") | |
| return False | |
| # Validate name is not just whitespace | |
| if not patient_data.get('full_name', '').strip(): | |
| logger.warning(f"⚠️ Empty or invalid name for patient") | |
| return False | |
| # Validate gender is valid | |
| if patient_data.get('gender') not in ['male', 'female', 'other', 'unknown']: | |
| logger.warning(f"⚠️ Invalid gender '{patient_data.get('gender')}' for patient {patient_data.get('full_name', 'Unknown')}") | |
| return False | |
| # Validate date of birth format | |
| try: | |
| if patient_data.get('date_of_birth'): | |
| datetime.strptime(patient_data['date_of_birth'], '%Y-%m-%d') | |
| else: | |
| logger.warning(f"⚠️ Missing date of birth for patient {patient_data.get('full_name', 'Unknown')}") | |
| return False | |
| except ValueError: | |
| logger.warning(f"⚠️ Invalid date of birth format '{patient_data.get('date_of_birth')}' for patient {patient_data.get('full_name', 'Unknown')}") | |
| return False | |
| # For complete data, we require medical data | |
| if require_medical_data: | |
| # Check if patient has at least some medical data | |
| medical_data_present = ( | |
| len(patient_data.get('conditions', [])) > 0 or | |
| len(patient_data.get('medications', [])) > 0 or | |
| len(patient_data.get('encounters', [])) > 0 or | |
| len(patient_data.get('observations', [])) > 0 | |
| ) | |
| if not medical_data_present: | |
| logger.warning(f"❌ Patient {patient_data.get('full_name', 'Unknown')} rejected: no medical data (conditions, medications, encounters, or observations)") | |
| return False | |
| else: | |
| logger.info(f"✅ Patient {patient_data.get('full_name', 'Unknown')} has medical data") | |
| logger.info(f"✅ Patient {patient_data.get('full_name', 'Unknown')} passed complete validation") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Error validating patient data: {str(e)}") | |
| return False | |
| async def save_patients_to_database(self, patients: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """ | |
| Save generated patients directly to the database | |
| """ | |
| try: | |
| saved_count = 0 | |
| failed_count = 0 | |
| errors = [] | |
| for patient in patients: | |
| try: | |
| # Prepare patient document for database | |
| patient_doc = { | |
| '_id': ObjectId(), | |
| 'fhir_id': patient.get('fhir_id', f"synthea-{ObjectId()}"), | |
| 'full_name': patient.get('full_name', ''), | |
| 'gender': patient.get('gender', 'unknown'), | |
| 'date_of_birth': patient.get('date_of_birth'), | |
| 'address': patient.get('address', ''), | |
| 'city': patient.get('city', ''), | |
| 'state': patient.get('state', ''), | |
| 'postal_code': patient.get('postal_code', ''), | |
| 'country': patient.get('country', 'US'), | |
| 'marital_status': patient.get('marital_status', ''), | |
| 'language': patient.get('language', 'English'), | |
| 'source': patient.get('source', 'synthea'), | |
| 'status': 'active', | |
| 'created_at': datetime.utcnow(), | |
| 'updated_at': datetime.utcnow(), | |
| 'import_date': datetime.utcnow(), | |
| 'last_updated': datetime.utcnow(), | |
| 'conditions': patient.get('conditions', []), | |
| 'medications': patient.get('medications', []), | |
| 'encounters': patient.get('encounters', []), | |
| 'observations': patient.get('observations', []), | |
| 'procedures': patient.get('procedures', []), | |
| 'immunizations': patient.get('immunizations', []), | |
| 'allergies': patient.get('allergies', []), | |
| 'notes': [] | |
| } | |
| # Insert into database | |
| result = await patients_collection.insert_one(patient_doc) | |
| saved_count += 1 | |
| logger.info(f"✅ Saved patient {patient.get('full_name', 'Unknown')} with ID: {result.inserted_id}") | |
| except Exception as e: | |
| failed_count += 1 | |
| error_msg = f"Failed to save patient {patient.get('full_name', 'Unknown')}: {str(e)}" | |
| errors.append(error_msg) | |
| logger.error(f"❌ {error_msg}") | |
| continue | |
| return { | |
| "saved_count": saved_count, | |
| "failed_count": failed_count, | |
| "errors": errors, | |
| "success": saved_count > 0 | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error saving patients to database: {str(e)}") | |
| return { | |
| "saved_count": 0, | |
| "failed_count": len(patients), | |
| "errors": [f"Database error: {str(e)}"], | |
| "success": False | |
| } | |
| async def generate_and_import_patients( | |
| self, | |
| population: int = 10, | |
| age_min: int = 18, | |
| age_max: int = 80, | |
| gender: str = "both", | |
| location: str = "Massachusetts", | |
| require_medical_data: bool = True | |
| ) -> Dict[str, Any]: | |
| """ | |
| Complete workflow: generate Synthea data and prepare for import | |
| """ | |
| try: | |
| logger.info(f"🎯 Starting Synthea generation for {population} patients") | |
| # Always use real Synthea - no fallback to mock data | |
| try: | |
| # Download Synthea if needed | |
| if not await self.download_synthea(): | |
| logger.error("❌ Failed to download Synthea") | |
| raise Exception("Synthea download failed") | |
| # Generate configuration | |
| config_overrides = { | |
| "population": population, | |
| "age_min": age_min, | |
| "age_max": age_max, | |
| "gender": gender, | |
| "location": location | |
| } | |
| config_file = await self.generate_synthea_config(config_overrides) | |
| # Run generation | |
| if not await self.run_synthea_generation(config_file, population): | |
| logger.error("❌ Synthea generation failed") | |
| raise Exception("Synthea generation failed") | |
| # Process output - only get patients with complete data | |
| patients = await self.process_synthea_output(require_medical_data=True) | |
| if not patients: | |
| logger.error("❌ No patients with complete data generated from Synthea") | |
| raise Exception("No patients with complete data generated from Synthea") | |
| # Limit to exactly 10 patients with complete data | |
| if len(patients) > 10: | |
| logger.info(f"📊 Limiting from {len(patients)} to 10 patients with complete data") | |
| patients = patients[:10] | |
| logger.info(f"📊 Final patient count for database storage: {len(patients)}") | |
| # Save patients to database | |
| db_result = await self.save_patients_to_database(patients) | |
| logger.info(f"💾 Database save result: {db_result}") | |
| return { | |
| "status": "success", | |
| "generated_patients": len(patients), | |
| "saved_to_database": db_result["saved_count"], | |
| "failed_to_save": db_result["failed_count"], | |
| "database_errors": db_result["errors"], | |
| "patients": patients, | |
| "config": config_overrides, | |
| "output_directory": str(self.output_dir), | |
| "source": "synthea_real", | |
| "message": f"Successfully stored {db_result['saved_count']} patients with complete data to database" | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Synthea integration failed: {str(e)}") | |
| # No fallback to mock data - raise the error | |
| raise Exception(f"Synthea generation failed: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"❌ Error in generate_and_import_patients: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Patient generation failed: {str(e)}" | |
| ) | |
| async def get_synthea_statistics(self) -> Dict[str, Any]: | |
| """ | |
| Get statistics about Synthea capabilities and generated data | |
| """ | |
| try: | |
| stats = { | |
| "total_files": 0, | |
| "patient_files": 0, | |
| "hospital_files": 0, | |
| "practitioner_files": 0, | |
| "total_size_mb": 0, | |
| "synthea_available": False, | |
| "java_available": False, | |
| "directories_accessible": False, | |
| "environment": "local" | |
| } | |
| # Set environment info | |
| if self.is_containerized: | |
| stats["environment"] = "containerized" | |
| else: | |
| stats["environment"] = "local" | |
| # Always use real Synthea - no mock data fallback | |
| # Check if directories are accessible | |
| try: | |
| stats["directories_accessible"] = ( | |
| self.synthea_dir.exists() and | |
| self.output_dir.exists() and | |
| os.access(self.synthea_dir, os.W_OK) and | |
| os.access(self.output_dir, os.W_OK) | |
| ) | |
| except Exception: | |
| stats["directories_accessible"] = False | |
| # Check if Java is available | |
| try: | |
| java_check = await asyncio.create_subprocess_exec( | |
| "java", "-version", | |
| stdout=asyncio.subprocess.PIPE, | |
| stderr=asyncio.subprocess.PIPE | |
| ) | |
| await java_check.communicate() | |
| stats["java_available"] = (java_check.returncode == 0) | |
| except FileNotFoundError: | |
| stats["java_available"] = False | |
| # Check if Synthea JAR exists | |
| stats["synthea_available"] = self.synthea_jar_path.exists() | |
| # Only try to count files if output directory exists and is accessible | |
| if stats["directories_accessible"] and self.output_dir.exists(): | |
| try: | |
| # Check root directory | |
| for file_path in self.output_dir.glob("*.json"): | |
| stats["total_files"] += 1 | |
| stats["total_size_mb"] += file_path.stat().st_size / (1024 * 1024) | |
| if "hospitalInformation" in file_path.name: | |
| stats["hospital_files"] += 1 | |
| elif "practitionerInformation" in file_path.name: | |
| stats["practitioner_files"] += 1 | |
| else: | |
| stats["patient_files"] += 1 | |
| # Check subdirectories | |
| for subdir in self.output_dir.iterdir(): | |
| if subdir.is_dir(): | |
| for file_path in subdir.glob("*.json"): | |
| stats["total_files"] += 1 | |
| stats["total_size_mb"] += file_path.stat().st_size / (1024 * 1024) | |
| if "hospitalInformation" in file_path.name: | |
| stats["hospital_files"] += 1 | |
| elif "practitionerInformation" in file_path.name: | |
| stats["practitioner_files"] += 1 | |
| else: | |
| stats["patient_files"] += 1 | |
| except Exception as file_error: | |
| logger.warning(f"⚠️ Error accessing output directory files: {file_error}") | |
| return stats | |
| except Exception as e: | |
| logger.error(f"❌ Error getting patient generation statistics: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "total_files": 0, | |
| "patient_files": 0, | |
| "hospital_files": 0, | |
| "practitioner_files": 0, | |
| "total_size_mb": 0, | |
| "synthea_available": False, | |
| "java_available": False, | |
| "directories_accessible": False, | |
| "environment": "unknown" | |
| } | |