""" Health Record Merger Merge and aggregate health records from multiple days """ from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from collections import defaultdict from .pydantic_models import ( HealthRecord, NutritionRecord, ExerciseRecord, SymptomRecord, MentalHealthRecord, RecordType ) class HealthRecordMerger: """Merge and aggregate health records""" @staticmethod def merge_records( records: List[HealthRecord], merge_strategy: str = 'latest' ) -> Dict[str, Any]: """ Merge multiple health records into aggregated data Args: records: List of health records to merge merge_strategy: 'latest', 'average', 'all' Returns: Merged data dictionary """ if not records: return {} # Group records by type records_by_type = defaultdict(list) for record in records: records_by_type[record.record_type].append(record) merged = { 'total_records': len(records), 'date_range': { 'start': min(r.timestamp for r in records).isoformat(), 'end': max(r.timestamp for r in records).isoformat() }, 'by_type': {} } # Merge each type for record_type, type_records in records_by_type.items(): if record_type == RecordType.NUTRITION: merged['by_type']['nutrition'] = HealthRecordMerger._merge_nutrition_records( type_records, merge_strategy ) elif record_type == RecordType.EXERCISE: merged['by_type']['exercise'] = HealthRecordMerger._merge_exercise_records( type_records, merge_strategy ) elif record_type == RecordType.SYMPTOM: merged['by_type']['symptom'] = HealthRecordMerger._merge_symptom_records( type_records, merge_strategy ) elif record_type == RecordType.MENTAL_HEALTH: merged['by_type']['mental_health'] = HealthRecordMerger._merge_mental_health_records( type_records, merge_strategy ) # Extract common health metrics merged['health_metrics'] = HealthRecordMerger._extract_health_metrics( records, merge_strategy ) return merged @staticmethod def _merge_nutrition_records( records: List[HealthRecord], strategy: str ) -> Dict[str, Any]: """Merge nutrition records""" if strategy == 'latest': latest = max(records, key=lambda r: r.timestamp) return { 'latest_record': latest.model_dump(), 'total_records': len(records) } elif strategy == 'average': # Calculate averages total_calories = sum(r.data.get('calories', 0) for r in records if r.data.get('calories')) total_protein = sum(r.data.get('protein', 0) for r in records if r.data.get('protein')) total_carbs = sum(r.data.get('carbs', 0) for r in records if r.data.get('carbs')) total_fat = sum(r.data.get('fat', 0) for r in records if r.data.get('fat')) count = len(records) return { 'average_daily': { 'calories': round(total_calories / count, 1) if count > 0 else 0, 'protein': round(total_protein / count, 1) if count > 0 else 0, 'carbs': round(total_carbs / count, 1) if count > 0 else 0, 'fat': round(total_fat / count, 1) if count > 0 else 0 }, 'total': { 'calories': round(total_calories, 1), 'protein': round(total_protein, 1), 'carbs': round(total_carbs, 1), 'fat': round(total_fat, 1) }, 'total_records': count } else: # 'all' return { 'all_records': [r.model_dump() for r in records], 'total_records': len(records) } @staticmethod def _merge_exercise_records( records: List[HealthRecord], strategy: str ) -> Dict[str, Any]: """Merge exercise records""" if strategy == 'latest': latest = max(records, key=lambda r: r.timestamp) return { 'latest_record': latest.model_dump(), 'total_records': len(records) } elif strategy == 'average': total_duration = sum(r.data.get('duration_minutes', 0) for r in records) total_calories = sum(r.data.get('calories_burned', 0) for r in records) # Count by exercise type exercise_types = defaultdict(int) for r in records: ex_type = r.data.get('exercise_type', 'unknown') exercise_types[ex_type] += 1 count = len(records) return { 'total_workouts': count, 'total_duration_minutes': total_duration, 'total_calories_burned': round(total_calories, 1), 'average_duration': round(total_duration / count, 1) if count > 0 else 0, 'average_calories': round(total_calories / count, 1) if count > 0 else 0, 'exercise_types': dict(exercise_types) } else: # 'all' return { 'all_records': [r.model_dump() for r in records], 'total_records': len(records) } @staticmethod def _merge_symptom_records( records: List[HealthRecord], strategy: str ) -> Dict[str, Any]: """Merge symptom records""" if strategy == 'latest': latest = max(records, key=lambda r: r.timestamp) return { 'latest_record': latest.model_dump(), 'total_records': len(records) } # Collect all symptoms all_symptoms = [] symptom_counts = defaultdict(int) for r in records: symptoms = r.data.get('symptoms', []) if isinstance(symptoms, list): all_symptoms.extend(symptoms) for symptom in symptoms: symptom_counts[symptom] += 1 return { 'total_reports': len(records), 'unique_symptoms': len(set(all_symptoms)), 'most_common_symptoms': sorted( symptom_counts.items(), key=lambda x: x[1], reverse=True )[:5], 'all_symptoms': list(set(all_symptoms)) } @staticmethod def _merge_mental_health_records( records: List[HealthRecord], strategy: str ) -> Dict[str, Any]: """Merge mental health records""" if strategy == 'latest': latest = max(records, key=lambda r: r.timestamp) return { 'latest_record': latest.model_dump(), 'total_records': len(records) } # Calculate averages stress_levels = [r.data.get('stress_level') for r in records if r.data.get('stress_level')] sleep_hours = [r.data.get('sleep_hours') for r in records if r.data.get('sleep_hours')] sleep_quality = [r.data.get('sleep_quality') for r in records if r.data.get('sleep_quality')] return { 'total_records': len(records), 'average_stress_level': round(sum(stress_levels) / len(stress_levels), 1) if stress_levels else None, 'average_sleep_hours': round(sum(sleep_hours) / len(sleep_hours), 1) if sleep_hours else None, 'average_sleep_quality': round(sum(sleep_quality) / len(sleep_quality), 1) if sleep_quality else None, 'stress_trend': 'improving' if len(stress_levels) >= 2 and stress_levels[-1] < stress_levels[0] else 'stable' } @staticmethod def _extract_health_metrics( records: List[HealthRecord], strategy: str ) -> Dict[str, Any]: """Extract common health metrics from records""" weights = [r.weight for r in records if r.weight] heights = [r.height for r in records if r.height] bmis = [r.bmi for r in records if r.bmi] metrics = {} if weights: metrics['weight'] = { 'latest': weights[-1], 'average': round(sum(weights) / len(weights), 1), 'min': min(weights), 'max': max(weights), 'change': round(weights[-1] - weights[0], 1) if len(weights) >= 2 else 0 } if heights: metrics['height'] = { 'latest': heights[-1], 'average': round(sum(heights) / len(heights), 1) } if bmis: metrics['bmi'] = { 'latest': bmis[-1], 'average': round(sum(bmis) / len(bmis), 1), 'change': round(bmis[-1] - bmis[0], 1) if len(bmis) >= 2 else 0 } return metrics @staticmethod def merge_by_date_range( records: List[HealthRecord], start_date: datetime, end_date: datetime, merge_strategy: str = 'average' ) -> Dict[str, Any]: """ Merge records within a specific date range Args: records: All health records start_date: Start of date range end_date: End of date range merge_strategy: How to merge data Returns: Merged data for the date range """ # Filter records by date range filtered = [ r for r in records if start_date <= r.timestamp <= end_date ] return HealthRecordMerger.merge_records(filtered, merge_strategy) @staticmethod def get_weekly_summary( records: List[HealthRecord], weeks_back: int = 1 ) -> Dict[str, Any]: """ Get weekly summary of health records Args: records: All health records weeks_back: Number of weeks to look back Returns: Weekly summary """ end_date = datetime.now() start_date = end_date - timedelta(weeks=weeks_back) return HealthRecordMerger.merge_by_date_range( records, start_date, end_date, merge_strategy='average' ) @staticmethod def get_monthly_summary( records: List[HealthRecord], months_back: int = 1 ) -> Dict[str, Any]: """ Get monthly summary of health records Args: records: All health records months_back: Number of months to look back Returns: Monthly summary """ end_date = datetime.now() start_date = end_date - timedelta(days=30 * months_back) return HealthRecordMerger.merge_by_date_range( records, start_date, end_date, merge_strategy='average' ) def merge_records( records: List[HealthRecord], strategy: str = 'latest' ) -> Dict[str, Any]: """ Convenience function to merge health records Args: records: List of health records strategy: 'latest', 'average', or 'all' Returns: Merged data dictionary """ return HealthRecordMerger.merge_records(records, strategy)