Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, Depends, HTTPException, status, Query | |
| from typing import List, Optional | |
| from datetime import date, time, datetime, timedelta | |
| from bson import ObjectId | |
| from motor.motor_asyncio import AsyncIOMotorClient | |
| import logging | |
| from core.security import get_current_user | |
| from db.mongo import db, patients_collection | |
| from models.schemas import ( | |
| AppointmentCreate, AppointmentUpdate, AppointmentResponse, AppointmentListResponse, | |
| AppointmentStatus, AppointmentType, DoctorAvailabilityCreate, DoctorAvailabilityUpdate, | |
| DoctorAvailabilityResponse, AvailableSlotsResponse, AppointmentSlot, DoctorListResponse | |
| ) | |
| logger = logging.getLogger(__name__) | |
| router = APIRouter(tags=["appointments"]) | |
| # --- HELPER FUNCTIONS --- | |
| def is_valid_object_id(id_str: str) -> bool: | |
| """Validate if a string is a valid ObjectId""" | |
| try: | |
| ObjectId(id_str) | |
| return True | |
| except: | |
| return False | |
| async def assign_patient_to_doctor_in_collection(patient_id: str, doctor_id: str, db_client: AsyncIOMotorClient, appointment_data: dict = None): | |
| """Helper function to assign a patient to a doctor in the patients collection""" | |
| try: | |
| # Get patient details from users collection | |
| patient = await db_client.users.find_one({"_id": ObjectId(patient_id)}) | |
| if not patient: | |
| logger.warning(f"Patient {patient_id} not found in users collection") | |
| return False | |
| # Check if patient already exists in patients collection | |
| existing_patient = await patients_collection.find_one({ | |
| "fhir_id": patient.get("fhir_id") or str(patient["_id"]) | |
| }) | |
| # Prepare patient data with enhanced fields from appointment if provided | |
| patient_data = { | |
| "fhir_id": patient.get("fhir_id") or str(patient["_id"]), | |
| "full_name": appointment_data.get("patient_full_name") or patient.get("full_name", ""), | |
| "date_of_birth": appointment_data.get("patient_date_of_birth") or patient.get("date_of_birth") or datetime.now().strftime('%Y-%m-%d'), | |
| "gender": appointment_data.get("patient_gender") or patient.get("gender") or "unknown", | |
| "address": appointment_data.get("patient_address") or patient.get("address"), | |
| "city": appointment_data.get("patient_city") or patient.get("city"), | |
| "state": appointment_data.get("patient_state") or patient.get("state"), | |
| "postal_code": appointment_data.get("patient_postal_code") or patient.get("postal_code"), | |
| "country": appointment_data.get("patient_country") or patient.get("country"), | |
| "national_id": appointment_data.get("patient_national_id") or patient.get("national_id"), | |
| "blood_type": appointment_data.get("patient_blood_type") or patient.get("blood_type"), | |
| "allergies": appointment_data.get("patient_allergies", []) or patient.get("allergies", []), | |
| "chronic_conditions": appointment_data.get("patient_chronic_conditions", []) or patient.get("chronic_conditions", []), | |
| "medications": appointment_data.get("patient_medications", []) or patient.get("medications", []), | |
| "emergency_contact_name": appointment_data.get("patient_emergency_contact_name") or patient.get("emergency_contact_name"), | |
| "emergency_contact_phone": appointment_data.get("patient_emergency_contact_phone") or patient.get("emergency_contact_phone"), | |
| "insurance_provider": appointment_data.get("patient_insurance_provider") or patient.get("insurance_provider"), | |
| "insurance_policy_number": appointment_data.get("patient_insurance_policy_number") or patient.get("insurance_policy_number"), | |
| "source": "direct", # Patient came through appointment booking | |
| "status": "active", | |
| "assigned_doctor_id": str(doctor_id), | |
| "registration_date": datetime.now(), | |
| "created_at": datetime.now(), | |
| "updated_at": datetime.now() | |
| } | |
| # Add enhanced medical data if provided | |
| if appointment_data: | |
| # Handle medical notes | |
| if appointment_data.get("patient_medical_notes"): | |
| patient_data["notes"] = [{ | |
| "content": appointment_data["patient_medical_notes"], | |
| "type": "medical_notes", | |
| "created_at": datetime.now(), | |
| "source": "appointment_creation" | |
| }] | |
| # Handle symptoms | |
| if appointment_data.get("patient_symptoms"): | |
| patient_data["symptoms"] = appointment_data["patient_symptoms"] | |
| # Handle vital signs | |
| if appointment_data.get("patient_vital_signs"): | |
| patient_data["vital_signs"] = appointment_data["patient_vital_signs"] | |
| # Handle lab results | |
| if appointment_data.get("patient_lab_results"): | |
| patient_data["lab_results"] = appointment_data["patient_lab_results"] | |
| # Handle imaging results | |
| if appointment_data.get("patient_imaging_results"): | |
| patient_data["imaging_results"] = appointment_data["patient_imaging_results"] | |
| # Handle previous encounters | |
| if appointment_data.get("patient_previous_encounters"): | |
| patient_data["encounters"] = appointment_data["patient_previous_encounters"] | |
| if existing_patient: | |
| # Update existing patient with new data and assign to this doctor | |
| update_data = { | |
| "assigned_doctor_id": str(doctor_id), | |
| "updated_at": datetime.now() | |
| } | |
| # Only update fields that are provided and not empty | |
| for key, value in patient_data.items(): | |
| if key not in ["fhir_id", "created_at", "registration_date"] and value is not None: | |
| if isinstance(value, list) and value: # Only update non-empty lists | |
| update_data[key] = value | |
| elif not isinstance(value, list): # Update non-list values | |
| update_data[key] = value | |
| await patients_collection.update_one( | |
| {"_id": existing_patient["_id"]}, | |
| {"$set": update_data} | |
| ) | |
| logger.info(f"✅ Patient {patient_data['full_name']} updated and reassigned to doctor {doctor_id}") | |
| else: | |
| # Create new patient record in patients collection | |
| await patients_collection.insert_one(patient_data) | |
| logger.info(f"✅ New patient {patient_data['full_name']} created and assigned to doctor {doctor_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"⚠️ Warning: Failed to assign patient to doctor: {str(e)}") | |
| return False | |
| # --- MAIN APPOINTMENTS ENDPOINT --- | |
| async def get_appointments( | |
| page: int = Query(1, ge=1, description="Page number"), | |
| limit: int = Query(10, ge=1, le=100, description="Number of appointments per page"), | |
| status_filter: Optional[str] = Query(None, description="Filter by status"), | |
| date_from: Optional[date] = Query(None, description="Filter from date"), | |
| date_to: Optional[date] = Query(None, description="Filter to date"), | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Get appointments with pagination and filters""" | |
| try: | |
| # Build query based on user role and filters | |
| query = {} | |
| # Add status filter | |
| if status_filter: | |
| query["status"] = status_filter | |
| # Add date filters | |
| if date_from or date_to: | |
| date_query = {} | |
| if date_from: | |
| date_query["$gte"] = date_from.isoformat() | |
| if date_to: | |
| date_query["$lte"] = date_to.isoformat() | |
| query["date"] = date_query | |
| # Add user-specific filters | |
| if 'patient' in current_user.get('roles', []) or current_user.get('role') == 'patient': | |
| # Patients can only see their own appointments | |
| query["patient_id"] = ObjectId(current_user["_id"]) | |
| elif 'doctor' in current_user.get('roles', []) or current_user.get('role') == 'doctor': | |
| # Doctors can see appointments assigned to them | |
| query["doctor_id"] = ObjectId(current_user["_id"]) | |
| # Admins can see all appointments (no additional filter) | |
| # Calculate skip for pagination | |
| skip = (page - 1) * limit | |
| # Get appointments | |
| cursor = db_client.appointments.find(query).sort("date", -1).skip(skip).limit(limit) | |
| appointments = await cursor.to_list(length=None) | |
| # Get total count for pagination | |
| total_count = await db_client.appointments.count_documents(query) | |
| # Convert to response models | |
| appointment_responses = [] | |
| for apt in appointments: | |
| # Get patient and doctor names | |
| patient = await db_client.users.find_one({"_id": apt["patient_id"]}) | |
| doctor = await db_client.users.find_one({"_id": apt["doctor_id"]}) | |
| # Convert string date/time back to objects for response | |
| apt_date = datetime.strptime(apt["date"], '%Y-%m-%d').date() if isinstance(apt["date"], str) else apt["date"] | |
| apt_time = datetime.strptime(apt["time"], '%H:%M:%S').time() if isinstance(apt["time"], str) else apt["time"] | |
| appointment_responses.append(AppointmentResponse( | |
| id=str(apt["_id"]), | |
| patient_id=str(apt["patient_id"]), | |
| doctor_id=str(apt["doctor_id"]), | |
| patient_name=patient['full_name'] if patient else "Unknown Patient", | |
| doctor_name=doctor['full_name'] if doctor else "Unknown Doctor", | |
| date=apt_date, | |
| time=apt_time, | |
| type=apt.get("type", "consultation"), | |
| status=apt.get("status", "pending"), | |
| reason=apt.get("reason"), | |
| notes=apt.get("notes"), | |
| duration=apt.get("duration", 30), | |
| created_at=apt.get("created_at", datetime.now()), | |
| updated_at=apt.get("updated_at", datetime.now()) | |
| )) | |
| return AppointmentListResponse( | |
| appointments=appointment_responses, | |
| total=total_count, | |
| page=page, | |
| limit=limit, | |
| total_pages=(total_count + limit - 1) // limit | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error fetching appointments: {e}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to fetch appointments" | |
| ) | |
| # --- HELPER FUNCTIONS --- | |
| def is_valid_object_id(id_str: str) -> bool: | |
| try: | |
| ObjectId(id_str) | |
| return True | |
| except: | |
| return False | |
| def get_weekday_from_date(appointment_date: date) -> int: | |
| """Convert date to weekday (0=Monday, 6=Sunday)""" | |
| return appointment_date.weekday() | |
| def generate_time_slots(start_time: time, end_time: time, duration: int = 30) -> List[time]: | |
| """Generate time slots between start and end time""" | |
| slots = [] | |
| current_time = datetime.combine(date.today(), start_time) | |
| end_datetime = datetime.combine(date.today(), end_time) | |
| while current_time < end_datetime: | |
| slots.append(current_time.time()) | |
| current_time += timedelta(minutes=duration) | |
| return slots | |
| # --- PATIENT ASSIGNMENT ENDPOINT --- | |
| async def assign_patient_to_doctor( | |
| patient_id: str, | |
| doctor_id: str, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Manually assign a patient to a doctor""" | |
| # Only doctors and admins can assign patients | |
| if not (('doctor' in current_user.get('roles', []) or current_user.get('role') == 'doctor') or | |
| ('admin' in current_user.get('roles', []) or current_user.get('role') == 'admin')): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only doctors and admins can assign patients" | |
| ) | |
| # Validate ObjectIds | |
| if not is_valid_object_id(patient_id) or not is_valid_object_id(doctor_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid patient_id or doctor_id" | |
| ) | |
| # Check if doctor exists and is a doctor | |
| doctor = await db_client.users.find_one({"_id": ObjectId(doctor_id)}) | |
| if not doctor or (doctor.get('role') != 'doctor' and 'doctor' not in doctor.get('roles', [])): | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Doctor not found" | |
| ) | |
| # Check if patient exists | |
| patient = await db_client.users.find_one({"_id": ObjectId(patient_id)}) | |
| if not patient: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| # Use the helper function to assign patient to doctor | |
| success = await assign_patient_to_doctor_in_collection(patient_id, doctor_id, db_client, None) | |
| if success: | |
| return { | |
| "message": f"Patient {patient.get('full_name', 'Unknown')} successfully assigned to doctor {doctor.get('full_name', 'Unknown')}", | |
| "patient_id": patient_id, | |
| "doctor_id": doctor_id | |
| } | |
| else: | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail="Failed to assign patient to doctor" | |
| ) | |
| # --- BULK PATIENT ASSIGNMENT ENDPOINT --- | |
| async def process_confirmed_appointments( | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Process all confirmed appointments and ensure patients are assigned to doctors""" | |
| # Only admins can run this bulk operation | |
| if not ('admin' in current_user.get('roles', []) or current_user.get('role') == 'admin'): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only administrators can run bulk operations" | |
| ) | |
| try: | |
| # Find all confirmed appointments | |
| confirmed_appointments = await db_client.appointments.find({ | |
| "status": "confirmed" | |
| }).to_list(length=None) | |
| processed_count = 0 | |
| errors = [] | |
| for appointment in confirmed_appointments: | |
| try: | |
| success = await assign_patient_to_doctor_in_collection( | |
| str(appointment["patient_id"]), | |
| str(appointment["doctor_id"]), | |
| db_client, | |
| None | |
| ) | |
| if success: | |
| processed_count += 1 | |
| else: | |
| errors.append(f"Failed to process appointment {appointment['_id']}") | |
| except Exception as e: | |
| errors.append(f"Error processing appointment {appointment['_id']}: {str(e)}") | |
| return { | |
| "message": f"Processed {processed_count} confirmed appointments", | |
| "processed_count": processed_count, | |
| "total_appointments": len(confirmed_appointments), | |
| "errors": errors | |
| } | |
| except Exception as e: | |
| logger.error(f"❌ Error processing confirmed appointments: {str(e)}") | |
| raise HTTPException( | |
| status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, | |
| detail=f"Failed to process confirmed appointments: {str(e)}" | |
| ) | |
| # --- DOCTOR LIST ENDPOINT (MUST BE BEFORE PARAMETERIZED ROUTES) --- | |
| async def get_doctors( | |
| specialty: Optional[str] = Query(None), | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Get list of available doctors""" | |
| # Build filter - handle both "role" (singular) and "roles" (array) fields | |
| filter_query = { | |
| "$or": [ | |
| {"roles": {"$in": ["doctor"]}}, | |
| {"role": "doctor"} | |
| ] | |
| } | |
| if specialty: | |
| filter_query["$and"] = [ | |
| {"$or": [{"roles": {"$in": ["doctor"]}}, {"role": "doctor"}]}, | |
| {"specialty": {"$regex": specialty, "$options": "i"}} | |
| ] | |
| # Get doctors | |
| cursor = db_client.users.find(filter_query) | |
| doctors = await cursor.to_list(length=None) | |
| return [ | |
| DoctorListResponse( | |
| id=str(doctor["_id"]), | |
| full_name=doctor['full_name'], | |
| specialty=doctor.get('specialty', ''), | |
| license_number=doctor.get('license_number', ''), | |
| email=doctor['email'], | |
| phone=doctor.get('phone') | |
| ) | |
| for doctor in doctors | |
| ] | |
| # --- DOCTOR AVAILABILITY ENDPOINTS --- | |
| async def create_doctor_availability( | |
| availability_data: DoctorAvailabilityCreate, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Create doctor availability""" | |
| if (current_user.get('role') != 'doctor' and 'doctor' not in current_user.get('roles', [])) and 'admin' not in current_user.get('roles', []): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only doctors can set their availability" | |
| ) | |
| # Check if doctor exists | |
| doctor = await db_client.users.find_one({"_id": ObjectId(availability_data.doctor_id)}) | |
| if not doctor: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Doctor not found" | |
| ) | |
| # Check if doctor is setting their own availability or admin is setting it | |
| if (current_user.get('role') != 'admin' and 'admin' not in current_user.get('roles', [])) and availability_data.doctor_id != current_user["_id"]: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only set your own availability" | |
| ) | |
| # Check if availability already exists for this day | |
| existing = await db_client.doctor_availability.find_one({ | |
| "doctor_id": ObjectId(availability_data.doctor_id), | |
| "day_of_week": availability_data.day_of_week | |
| }) | |
| if existing: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="Availability already exists for this day" | |
| ) | |
| # Create availability | |
| availability_doc = { | |
| "doctor_id": ObjectId(availability_data.doctor_id), | |
| "day_of_week": availability_data.day_of_week, | |
| "start_time": availability_data.start_time, | |
| "end_time": availability_data.end_time, | |
| "is_available": availability_data.is_available | |
| } | |
| result = await db_client.doctor_availability.insert_one(availability_doc) | |
| availability_doc["_id"] = result.inserted_id | |
| return DoctorAvailabilityResponse( | |
| id=str(availability_doc["_id"]), | |
| doctor_id=availability_data.doctor_id, | |
| doctor_name=doctor['full_name'], | |
| day_of_week=availability_doc["day_of_week"], | |
| start_time=availability_doc["start_time"], | |
| end_time=availability_doc["end_time"], | |
| is_available=availability_doc["is_available"] | |
| ) | |
| async def get_doctor_availability( | |
| doctor_id: str, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Get doctor availability""" | |
| if not is_valid_object_id(doctor_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid doctor ID" | |
| ) | |
| # Check if doctor exists | |
| doctor = await db_client.users.find_one({"_id": ObjectId(doctor_id)}) | |
| if not doctor: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Doctor not found" | |
| ) | |
| # Get availability | |
| cursor = db_client.doctor_availability.find({"doctor_id": ObjectId(doctor_id)}) | |
| availabilities = await cursor.to_list(length=None) | |
| return [ | |
| DoctorAvailabilityResponse( | |
| id=str(avail["_id"]), | |
| doctor_id=doctor_id, | |
| doctor_name=doctor['full_name'], | |
| day_of_week=avail["day_of_week"], | |
| start_time=avail["start_time"], | |
| end_time=avail["end_time"], | |
| is_available=avail["is_available"] | |
| ) | |
| for avail in availabilities | |
| ] | |
| # --- AVAILABLE SLOTS ENDPOINTS --- | |
| async def get_available_slots( | |
| doctor_id: str, | |
| date: date, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Get available appointment slots for a doctor on a specific date""" | |
| if not is_valid_object_id(doctor_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid doctor ID" | |
| ) | |
| # Check if doctor exists | |
| doctor = await db_client.users.find_one({"_id": ObjectId(doctor_id)}) | |
| if not doctor or (doctor.get('role') != 'doctor' and 'doctor' not in doctor.get('roles', [])): | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Doctor not found" | |
| ) | |
| # Get doctor availability for this day | |
| weekday = get_weekday_from_date(date) | |
| availability = await db_client.doctor_availability.find_one({ | |
| "doctor_id": ObjectId(doctor_id), | |
| "day_of_week": weekday, | |
| "is_available": True | |
| }) | |
| if not availability: | |
| return AvailableSlotsResponse( | |
| doctor_id=doctor_id, | |
| doctor_name=doctor['full_name'], | |
| specialty=doctor.get('specialty', ''), | |
| date=date, | |
| slots=[] | |
| ) | |
| # Generate time slots | |
| time_slots = generate_time_slots(availability["start_time"], availability["end_time"]) | |
| # Get existing appointments for this date | |
| existing_appointments = await db_client.appointments.find({ | |
| "doctor_id": ObjectId(doctor_id), | |
| "date": date, | |
| "status": {"$in": ["pending", "confirmed"]} | |
| }).to_list(length=None) | |
| booked_times = {apt["time"] for apt in existing_appointments} | |
| # Create slot responses | |
| slots = [] | |
| for slot_time in time_slots: | |
| is_available = slot_time not in booked_times | |
| appointment_id = None | |
| if not is_available: | |
| # Find the appointment ID for this slot | |
| appointment = next((apt for apt in existing_appointments if apt["time"] == slot_time), None) | |
| appointment_id = str(appointment["_id"]) if appointment else None | |
| slots.append(AppointmentSlot( | |
| date=date, | |
| time=slot_time, | |
| is_available=is_available, | |
| appointment_id=appointment_id | |
| )) | |
| return AvailableSlotsResponse( | |
| doctor_id=doctor_id, | |
| doctor_name=doctor['full_name'], | |
| specialty=doctor.get('specialty', ''), | |
| date=date, | |
| slots=slots | |
| ) | |
| # --- APPOINTMENT ENDPOINTS --- | |
| async def create_appointment( | |
| appointment_data: AppointmentCreate, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Create a new appointment""" | |
| print(f"🔍 Creating appointment with data: {appointment_data}") | |
| print(f"🔍 Current user: {current_user}") | |
| # Check if user is a patient | |
| if 'patient' not in current_user.get('roles', []) and current_user.get('role') != 'patient': | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Only patients can create appointments" | |
| ) | |
| # Validate ObjectIds | |
| if not is_valid_object_id(appointment_data.patient_id) or not is_valid_object_id(appointment_data.doctor_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid patient_id or doctor_id" | |
| ) | |
| # Check if patient exists and matches current user | |
| patient = await db_client.users.find_one({"_id": ObjectId(appointment_data.patient_id)}) | |
| if not patient: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Patient not found" | |
| ) | |
| if patient['email'] != current_user['email']: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only create appointments for yourself" | |
| ) | |
| # Check if doctor exists and is a doctor | |
| doctor = await db_client.users.find_one({"_id": ObjectId(appointment_data.doctor_id)}) | |
| if not doctor or (doctor.get('role') != 'doctor' and 'doctor' not in doctor.get('roles', [])): | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Doctor not found" | |
| ) | |
| # Convert string date and time to proper objects | |
| try: | |
| appointment_date = datetime.strptime(appointment_data.date, '%Y-%m-%d').date() | |
| appointment_time = datetime.strptime(appointment_data.time, '%H:%M:%S').time() | |
| # Convert date to datetime for MongoDB compatibility | |
| appointment_datetime = datetime.combine(appointment_date, appointment_time) | |
| except ValueError: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid date or time format. Use YYYY-MM-DD for date and HH:MM:SS for time" | |
| ) | |
| # Check if appointment date is in the future | |
| if appointment_datetime <= datetime.now(): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Appointment must be scheduled for a future date and time" | |
| ) | |
| # Check if slot is available | |
| existing_appointment = await db_client.appointments.find_one({ | |
| "doctor_id": ObjectId(appointment_data.doctor_id), | |
| "date": appointment_data.date, | |
| "time": appointment_data.time, | |
| "status": {"$in": ["pending", "confirmed"]} | |
| }) | |
| if existing_appointment: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="This time slot is already booked" | |
| ) | |
| # Create appointment | |
| appointment_doc = { | |
| "patient_id": ObjectId(appointment_data.patient_id), | |
| "doctor_id": ObjectId(appointment_data.doctor_id), | |
| "date": appointment_data.date, # Store as string | |
| "time": appointment_data.time, # Store as string | |
| "datetime": appointment_datetime, # Store full datetime for easier querying | |
| "type": appointment_data.type, | |
| "status": AppointmentStatus.PENDING, | |
| "reason": appointment_data.reason, | |
| "notes": appointment_data.notes, | |
| "duration": appointment_data.duration, | |
| "created_at": datetime.now(), | |
| "updated_at": datetime.now() | |
| } | |
| result = await db_client.appointments.insert_one(appointment_doc) | |
| appointment_doc["_id"] = result.inserted_id | |
| # If appointment is created with confirmed status, assign patient to doctor | |
| if appointment_data.status == "confirmed": | |
| # Convert appointment_data to dict for the helper function | |
| appointment_dict = appointment_data.dict() | |
| await assign_patient_to_doctor_in_collection(appointment_data.patient_id, appointment_data.doctor_id, db_client, appointment_dict) | |
| # Return appointment with patient and doctor names | |
| return AppointmentResponse( | |
| id=str(appointment_doc["_id"]), | |
| patient_id=appointment_data.patient_id, | |
| doctor_id=appointment_data.doctor_id, | |
| patient_name=patient['full_name'], | |
| doctor_name=doctor['full_name'], | |
| date=appointment_date, # Convert back to date object | |
| time=appointment_time, # Convert back to time object | |
| type=appointment_doc["type"], | |
| status=appointment_doc["status"], | |
| reason=appointment_doc["reason"], | |
| notes=appointment_doc["notes"], | |
| duration=appointment_doc["duration"], | |
| created_at=appointment_doc["created_at"], | |
| updated_at=appointment_doc["updated_at"] | |
| ) | |
| async def get_appointment( | |
| appointment_id: str, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Get a specific appointment""" | |
| if not is_valid_object_id(appointment_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid appointment ID" | |
| ) | |
| appointment = await db_client.appointments.find_one({"_id": ObjectId(appointment_id)}) | |
| if not appointment: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Appointment not found" | |
| ) | |
| # Check permissions | |
| if 'patient' in current_user.get('roles', []) or current_user.get('role') == 'patient': | |
| if appointment["patient_id"] != ObjectId(current_user["_id"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view your own appointments" | |
| ) | |
| elif 'doctor' in current_user.get('roles', []) or current_user.get('role') == 'doctor': | |
| if appointment["doctor_id"] != ObjectId(current_user["_id"]): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only view appointments assigned to you" | |
| ) | |
| elif (current_user.get('role') != 'admin' and 'admin' not in current_user.get('roles', [])): | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="Insufficient permissions" | |
| ) | |
| # Get patient and doctor names | |
| patient = await db_client.users.find_one({"_id": appointment["patient_id"]}) | |
| doctor = await db_client.users.find_one({"_id": appointment["doctor_id"]}) | |
| # Convert string date/time back to objects for response | |
| apt_date = datetime.strptime(appointment["date"], '%Y-%m-%d').date() if isinstance(appointment["date"], str) else appointment["date"] | |
| apt_time = datetime.strptime(appointment["time"], '%H:%M:%S').time() if isinstance(appointment["time"], str) else appointment["time"] | |
| return AppointmentResponse( | |
| id=str(appointment["_id"]), | |
| patient_id=str(appointment["patient_id"]), | |
| doctor_id=str(appointment["doctor_id"]), | |
| patient_name=patient['full_name'] if patient else "Unknown Patient", | |
| doctor_name=doctor['full_name'] if doctor else "Unknown Doctor", | |
| date=apt_date, | |
| time=apt_time, | |
| type=appointment.get("type", "consultation"), # Default to consultation if missing | |
| status=appointment.get("status", "pending"), # Default to pending if missing | |
| reason=appointment.get("reason"), | |
| notes=appointment.get("notes"), | |
| duration=appointment.get("duration", 30), | |
| created_at=appointment.get("created_at", datetime.now()), | |
| updated_at=appointment.get("updated_at", datetime.now()) | |
| ) | |
| async def update_appointment( | |
| appointment_id: str, | |
| appointment_data: AppointmentUpdate, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Update an appointment""" | |
| if not is_valid_object_id(appointment_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid appointment ID" | |
| ) | |
| appointment = await db_client.appointments.find_one({"_id": ObjectId(appointment_id)}) | |
| if not appointment: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Appointment not found" | |
| ) | |
| # Check permissions | |
| can_update = False | |
| if 'admin' in current_user.get('roles', []) or current_user.get('role') == 'admin': | |
| can_update = True | |
| elif 'doctor' in current_user.get('roles', []) or current_user.get('role') == 'doctor': | |
| if appointment["doctor_id"] == ObjectId(current_user["_id"]): | |
| can_update = True | |
| elif 'patient' in current_user.get('roles', []) or current_user.get('role') == 'patient': | |
| if appointment["patient_id"] == ObjectId(current_user["_id"]): | |
| # Patients can only update certain fields | |
| can_update = True | |
| if not can_update: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only update appointments you're involved with" | |
| ) | |
| # Build update data | |
| update_data = {"updated_at": datetime.now()} | |
| if appointment_data.date is not None: | |
| try: | |
| # Store as string for MongoDB compatibility | |
| update_data["date"] = appointment_data.date | |
| except ValueError: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid date format. Use YYYY-MM-DD" | |
| ) | |
| if appointment_data.time is not None: | |
| try: | |
| # Store as string for MongoDB compatibility | |
| update_data["time"] = appointment_data.time | |
| except ValueError: | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid time format. Use HH:MM:SS" | |
| ) | |
| if appointment_data.type is not None: | |
| update_data["type"] = appointment_data.type | |
| if appointment_data.reason is not None: | |
| update_data["reason"] = appointment_data.reason | |
| if appointment_data.notes is not None: | |
| update_data["notes"] = appointment_data.notes | |
| if appointment_data.duration is not None: | |
| update_data["duration"] = appointment_data.duration | |
| # Only doctors and admins can update status | |
| if appointment_data.status is not None and (('doctor' in current_user.get('roles', []) or current_user.get('role') == 'doctor') or ('admin' in current_user.get('roles', []) or current_user.get('role') == 'admin')): | |
| update_data["status"] = appointment_data.status | |
| # Check for conflicts if date/time is being updated | |
| if appointment_data.date is not None or appointment_data.time is not None: | |
| new_date = update_data.get("date") or appointment["date"] | |
| new_time = update_data.get("time") or appointment["time"] | |
| existing_appointment = await db_client.appointments.find_one({ | |
| "_id": {"$ne": ObjectId(appointment_id)}, | |
| "doctor_id": appointment["doctor_id"], | |
| "date": new_date, | |
| "time": new_time, | |
| "status": {"$in": ["pending", "confirmed"]} | |
| }) | |
| if existing_appointment: | |
| raise HTTPException( | |
| status_code=status.HTTP_409_CONFLICT, | |
| detail="This time slot is already booked" | |
| ) | |
| # Update appointment | |
| await db_client.appointments.update_one( | |
| {"_id": ObjectId(appointment_id)}, | |
| {"$set": update_data} | |
| ) | |
| # If appointment status is being changed to confirmed, assign patient to doctor | |
| if appointment_data.status == "confirmed": | |
| # Convert appointment_data to dict for the helper function | |
| appointment_dict = appointment_data.dict() | |
| await assign_patient_to_doctor_in_collection(str(appointment["patient_id"]), str(appointment["doctor_id"]), db_client, appointment_dict) | |
| # Get updated appointment | |
| updated_appointment = await db_client.appointments.find_one({"_id": ObjectId(appointment_id)}) | |
| # Get patient and doctor names | |
| patient = await db_client.users.find_one({"_id": updated_appointment["patient_id"]}) | |
| doctor = await db_client.users.find_one({"_id": updated_appointment["doctor_id"]}) | |
| # Convert string date/time back to objects for response | |
| apt_date = datetime.strptime(updated_appointment["date"], '%Y-%m-%d').date() if isinstance(updated_appointment["date"], str) else updated_appointment["date"] | |
| apt_time = datetime.strptime(updated_appointment["time"], '%H:%M:%S').time() if isinstance(updated_appointment["time"], str) else updated_appointment["time"] | |
| return AppointmentResponse( | |
| id=str(updated_appointment["_id"]), | |
| patient_id=str(updated_appointment["patient_id"]), | |
| doctor_id=str(updated_appointment["doctor_id"]), | |
| patient_name=patient['full_name'] if patient else "Unknown Patient", | |
| doctor_name=doctor['full_name'] if doctor else "Unknown Doctor", | |
| date=apt_date, | |
| time=apt_time, | |
| type=updated_appointment.get("type", "consultation"), # Default to consultation if missing | |
| status=updated_appointment.get("status", "pending"), # Default to pending if missing | |
| reason=updated_appointment.get("reason"), | |
| notes=updated_appointment.get("notes"), | |
| duration=updated_appointment.get("duration", 30), | |
| created_at=updated_appointment.get("created_at", datetime.now()), | |
| updated_at=updated_appointment.get("updated_at", datetime.now()) | |
| ) | |
| async def delete_appointment( | |
| appointment_id: str, | |
| current_user: dict = Depends(get_current_user), | |
| db_client: AsyncIOMotorClient = Depends(lambda: db) | |
| ): | |
| """Delete an appointment""" | |
| if not is_valid_object_id(appointment_id): | |
| raise HTTPException( | |
| status_code=status.HTTP_400_BAD_REQUEST, | |
| detail="Invalid appointment ID" | |
| ) | |
| appointment = await db_client.appointments.find_one({"_id": ObjectId(appointment_id)}) | |
| if not appointment: | |
| raise HTTPException( | |
| status_code=status.HTTP_404_NOT_FOUND, | |
| detail="Appointment not found" | |
| ) | |
| # Check permissions | |
| can_delete = False | |
| if 'admin' in current_user.get('roles', []) or current_user.get('role') == 'admin': | |
| can_delete = True | |
| elif 'patient' in current_user.get('roles', []) or current_user.get('role') == 'patient': | |
| if appointment["patient_id"] == ObjectId(current_user["_id"]): | |
| can_delete = True | |
| if not can_delete: | |
| raise HTTPException( | |
| status_code=status.HTTP_403_FORBIDDEN, | |
| detail="You can only delete your own appointments" | |
| ) | |
| await db_client.appointments.delete_one({"_id": ObjectId(appointment_id)}) |