""" Tool definitions and execution handlers for FleetMind chat Simulates MCP tools using Claude's tool calling feature """ import sys from pathlib import Path from datetime import datetime, timedelta import logging # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from database.connection import execute_write, execute_query, get_db_connection from chat.geocoding import GeocodingService from psycopg2.extras import RealDictCursor logger = logging.getLogger(__name__) # Initialize geocoding service geocoding_service = GeocodingService() # Tool schemas for Claude TOOLS_SCHEMA = [ { "name": "geocode_address", "description": "Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid.", "input_schema": { "type": "object", "properties": { "address": { "type": "string", "description": "The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA')" } }, "required": ["address"] } }, { "name": "create_order", "description": "Create a new delivery order in the database. Only call this after geocoding the address successfully.", "input_schema": { "type": "object", "properties": { "customer_name": { "type": "string", "description": "Full name of the customer" }, "customer_phone": { "type": "string", "description": "Customer phone number (optional)" }, "customer_email": { "type": "string", "description": "Customer email address (optional)" }, "delivery_address": { "type": "string", "description": "Full delivery address" }, "delivery_lat": { "type": "number", "description": "Latitude from geocoding" }, "delivery_lng": { "type": "number", "description": "Longitude from geocoding" }, "time_window_end": { "type": "string", "description": "Delivery deadline in ISO format (e.g., '2025-11-13T17:00:00'). If not specified by user, default to 6 hours from now." }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Delivery priority. Default to 'standard' unless user specifies urgent/express." }, "special_instructions": { "type": "string", "description": "Any special delivery instructions (optional)" }, "weight_kg": { "type": "number", "description": "Package weight in kilograms (optional, default to 5.0)" } }, "required": ["customer_name", "delivery_address", "delivery_lat", "delivery_lng"] } }, { "name": "create_driver", "description": "Create a new delivery driver in the database. Use this to onboard new drivers to the fleet.", "input_schema": { "type": "object", "properties": { "name": { "type": "string", "description": "Full name of the driver" }, "phone": { "type": "string", "description": "Driver phone number (optional)" }, "email": { "type": "string", "description": "Driver email address (optional)" }, "vehicle_type": { "type": "string", "description": "Type of vehicle: van, truck, car, motorcycle (default: van)" }, "vehicle_plate": { "type": "string", "description": "Vehicle license plate number (optional)" }, "capacity_kg": { "type": "number", "description": "Vehicle cargo capacity in kilograms (default: 1000.0)" }, "capacity_m3": { "type": "number", "description": "Vehicle cargo volume in cubic meters (default: 12.0)" }, "skills": { "type": "array", "description": "List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery", "items": { "type": "string" } }, "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Driver status (default: active)" } }, "required": ["name"] } }, { "name": "count_orders", "description": "Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics.", "input_schema": { "type": "object", "properties": { "status": { "type": "string", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], "description": "Filter by order status (optional)" }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Filter by priority level (optional)" }, "payment_status": { "type": "string", "enum": ["pending", "paid", "cod"], "description": "Filter by payment status (optional)" }, "assigned_driver_id": { "type": "string", "description": "Filter by assigned driver ID (optional)" }, "is_fragile": { "type": "boolean", "description": "Filter fragile packages only (optional)" }, "requires_signature": { "type": "boolean", "description": "Filter orders requiring signature (optional)" }, "requires_cold_storage": { "type": "boolean", "description": "Filter orders requiring cold storage (optional)" } }, "required": [] } }, { "name": "fetch_orders", "description": "Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of orders to fetch (default: 10, max: 100)" }, "offset": { "type": "integer", "description": "Number of orders to skip for pagination (default: 0)" }, "status": { "type": "string", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"], "description": "Filter by order status (optional)" }, "priority": { "type": "string", "enum": ["standard", "express", "urgent"], "description": "Filter by priority level (optional)" }, "payment_status": { "type": "string", "enum": ["pending", "paid", "cod"], "description": "Filter by payment status (optional)" }, "assigned_driver_id": { "type": "string", "description": "Filter by assigned driver ID (optional)" }, "is_fragile": { "type": "boolean", "description": "Filter fragile packages only (optional)" }, "requires_signature": { "type": "boolean", "description": "Filter orders requiring signature (optional)" }, "requires_cold_storage": { "type": "boolean", "description": "Filter orders requiring cold storage (optional)" }, "sort_by": { "type": "string", "enum": ["created_at", "priority", "time_window_start"], "description": "Field to sort by (default: created_at)" }, "sort_order": { "type": "string", "enum": ["ASC", "DESC"], "description": "Sort order (default: DESC for newest first)" } }, "required": [] } }, { "name": "get_order_details", "description": "Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "The order ID to fetch details for (e.g., 'ORD-20251114163800')" } }, "required": ["order_id"] } }, { "name": "search_orders", "description": "Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders.", "input_schema": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Search term to match against customer_name, customer_email, customer_phone, or order_id" } }, "required": ["search_term"] } }, { "name": "get_incomplete_orders", "description": "Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit).", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of orders to fetch (default: 20)" } }, "required": [] } }, { "name": "count_drivers", "description": "Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics.", "input_schema": { "type": "object", "properties": { "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Filter by driver status (optional)" }, "vehicle_type": { "type": "string", "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" } }, "required": [] } }, { "name": "fetch_drivers", "description": "Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of drivers to fetch (default: 10, max: 100)" }, "offset": { "type": "integer", "description": "Number of drivers to skip for pagination (default: 0)" }, "status": { "type": "string", "enum": ["active", "busy", "offline", "unavailable"], "description": "Filter by driver status (optional)" }, "vehicle_type": { "type": "string", "description": "Filter by vehicle type: van, truck, car, motorcycle, etc. (optional)" }, "sort_by": { "type": "string", "enum": ["name", "status", "created_at", "last_location_update"], "description": "Field to sort by (default: name)" }, "sort_order": { "type": "string", "enum": ["ASC", "DESC"], "description": "Sort order (default: ASC for alphabetical)" } }, "required": [] } }, { "name": "get_driver_details", "description": "Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "The driver ID to fetch details for (e.g., 'DRV-20251114163800')" } }, "required": ["driver_id"] } }, { "name": "search_drivers", "description": "Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers.", "input_schema": { "type": "object", "properties": { "search_term": { "type": "string", "description": "Search term to match against name, email, phone, vehicle_plate, or driver_id" } }, "required": ["search_term"] } }, { "name": "get_available_drivers", "description": "Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch.", "input_schema": { "type": "object", "properties": { "limit": { "type": "integer", "description": "Number of drivers to fetch (default: 20)" } }, "required": [] } }, { "name": "update_order", "description": "Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "Order ID to update (e.g., 'ORD-20250114123456')" }, "customer_name": { "type": "string", "description": "Updated customer name" }, "customer_phone": { "type": "string", "description": "Updated customer phone number" }, "customer_email": { "type": "string", "description": "Updated customer email address" }, "delivery_address": { "type": "string", "description": "Updated delivery address" }, "delivery_lat": { "type": "number", "description": "Updated delivery latitude (required if updating address)" }, "delivery_lng": { "type": "number", "description": "Updated delivery longitude (required if updating address)" }, "status": { "type": "string", "description": "Updated order status", "enum": ["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] }, "priority": { "type": "string", "description": "Updated priority level", "enum": ["standard", "express", "urgent"] }, "special_instructions": { "type": "string", "description": "Updated special delivery instructions" }, "time_window_end": { "type": "string", "description": "Updated delivery deadline (ISO format datetime)" }, "payment_status": { "type": "string", "description": "Updated payment status", "enum": ["pending", "paid", "cod"] }, "weight_kg": { "type": "number", "description": "Updated package weight in kilograms" }, "order_value": { "type": "number", "description": "Updated order value in currency" } }, "required": ["order_id"] } }, { "name": "delete_order", "description": "Permanently delete an order from the database. This action cannot be undone. Use with caution.", "input_schema": { "type": "object", "properties": { "order_id": { "type": "string", "description": "Order ID to delete (e.g., 'ORD-20250114123456')" }, "confirm": { "type": "boolean", "description": "Must be set to true to confirm deletion" } }, "required": ["order_id", "confirm"] } }, { "name": "update_driver", "description": "Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "Driver ID to update (e.g., 'DRV-20250114123456')" }, "name": { "type": "string", "description": "Updated driver name" }, "phone": { "type": "string", "description": "Updated phone number" }, "email": { "type": "string", "description": "Updated email address" }, "status": { "type": "string", "description": "Updated driver status", "enum": ["active", "busy", "offline", "unavailable"] }, "vehicle_type": { "type": "string", "description": "Updated vehicle type" }, "vehicle_plate": { "type": "string", "description": "Updated vehicle license plate" }, "capacity_kg": { "type": "number", "description": "Updated cargo capacity in kilograms" }, "capacity_m3": { "type": "number", "description": "Updated cargo capacity in cubic meters" }, "skills": { "type": "array", "items": {"type": "string"}, "description": "Updated list of driver skills/certifications" }, "current_lat": { "type": "number", "description": "Updated current latitude" }, "current_lng": { "type": "number", "description": "Updated current longitude" } }, "required": ["driver_id"] } }, { "name": "delete_driver", "description": "Permanently delete a driver from the database. This action cannot be undone. Use with caution.", "input_schema": { "type": "object", "properties": { "driver_id": { "type": "string", "description": "Driver ID to delete (e.g., 'DRV-20250114123456')" }, "confirm": { "type": "boolean", "description": "Must be set to true to confirm deletion" } }, "required": ["driver_id", "confirm"] } } ] def execute_tool(tool_name: str, tool_input: dict) -> dict: """ Route tool execution to appropriate handler Args: tool_name: Name of the tool to execute tool_input: Tool input parameters Returns: Dict with tool execution results """ try: if tool_name == "geocode_address": return handle_geocode_address(tool_input) elif tool_name == "create_order": return handle_create_order(tool_input) elif tool_name == "create_driver": return handle_create_driver(tool_input) elif tool_name == "count_orders": return handle_count_orders(tool_input) elif tool_name == "fetch_orders": return handle_fetch_orders(tool_input) elif tool_name == "get_order_details": return handle_get_order_details(tool_input) elif tool_name == "search_orders": return handle_search_orders(tool_input) elif tool_name == "get_incomplete_orders": return handle_get_incomplete_orders(tool_input) elif tool_name == "count_drivers": return handle_count_drivers(tool_input) elif tool_name == "fetch_drivers": return handle_fetch_drivers(tool_input) elif tool_name == "get_driver_details": return handle_get_driver_details(tool_input) elif tool_name == "search_drivers": return handle_search_drivers(tool_input) elif tool_name == "get_available_drivers": return handle_get_available_drivers(tool_input) elif tool_name == "update_order": return handle_update_order(tool_input) elif tool_name == "delete_order": return handle_delete_order(tool_input) elif tool_name == "update_driver": return handle_update_driver(tool_input) elif tool_name == "delete_driver": return handle_delete_driver(tool_input) else: return { "success": False, "error": f"Unknown tool: {tool_name}" } except Exception as e: logger.error(f"Tool execution error ({tool_name}): {e}") return { "success": False, "error": str(e) } def handle_geocode_address(tool_input: dict) -> dict: """ Execute geocoding tool Args: tool_input: Dict with 'address' key Returns: Geocoding result """ address = tool_input.get("address", "") if not address: return { "success": False, "error": "Address is required" } logger.info(f"Geocoding address: {address}") result = geocoding_service.geocode(address) return { "success": True, "latitude": result["lat"], "longitude": result["lng"], "formatted_address": result["formatted_address"], "confidence": result["confidence"], "message": f"Address geocoded successfully ({result['confidence']})" } def handle_calculate_route(tool_input: dict) -> dict: """ Execute route calculation tool Args: tool_input: Dict with origin, destination, mode, vehicle_type, alternatives, include_steps Returns: Route calculation result with distance, duration, and optional directions """ import math from datetime import datetime origin = tool_input.get("origin", "") destination = tool_input.get("destination", "") mode = tool_input.get("mode", "driving") vehicle_type = tool_input.get("vehicle_type", "car") alternatives = tool_input.get("alternatives", False) include_steps = tool_input.get("include_steps", False) if not origin or not destination: return { "success": False, "error": "Both origin and destination are required" } # Map vehicle type to travel mode VEHICLE_TYPE_TO_MODE = { "motorcycle": "TWO_WHEELER", # Use proper TWO_WHEELER mode for motorcycle-specific routing "bicycle": "bicycling", "car": "driving", "van": "driving", "truck": "driving" # Note: No truck-specific routing available in API } # Override mode if vehicle_type is provided if vehicle_type in VEHICLE_TYPE_TO_MODE: mode = VEHICLE_TYPE_TO_MODE[vehicle_type] logger.info(f"Vehicle type '{vehicle_type}' mapped to mode '{mode}'") logger.info(f"Calculating route: {origin} → {destination} (mode: {mode}, vehicle: {vehicle_type})") # Triple fallback: Routes API → Directions API → Mock if geocoding_service.use_mock: logger.info("Using mock route calculation (no API key)") result = _calculate_route_mock(origin, destination, mode) else: try: # Try Routes API first (recommended, more accurate) logger.info("Attempting Routes API (recommended)") result = _calculate_route_routes_api(origin, destination, mode, alternatives, include_steps, vehicle_type, tool_input) except Exception as e: logger.warning(f"Routes API failed: {e}") try: # Fall back to Directions API (legacy) logger.info("Falling back to Directions API (legacy)") result = _calculate_route_google(origin, destination, mode, alternatives, include_steps) except Exception as e2: # Fall back to mock calculation logger.error(f"Directions API also failed: {e2}, falling back to mock") result = _calculate_route_mock(origin, destination, mode) # Add vehicle type to result for use in intelligent routing result["vehicle_type"] = vehicle_type return result def _calculate_route_google(origin: str, destination: str, mode: str, alternatives: bool, include_steps: bool) -> dict: """Calculate route using Google Maps Directions API""" try: # Map our mode to Google Maps mode mode_mapping = { "driving": "driving", "walking": "walking", "bicycling": "bicycling", "transit": "transit" } gmaps_mode = mode_mapping.get(mode, "driving") # Call Google Maps Directions API result = geocoding_service.gmaps_client.directions( origin=origin, destination=destination, mode=gmaps_mode, alternatives=alternatives, departure_time="now" # Get real-time traffic data ) if not result: logger.warning(f"Google Maps Directions API found no routes for: {origin} → {destination}") return _calculate_route_mock(origin, destination, mode) # Get first (best) route route = result[0] leg = route['legs'][0] # First leg (direct route) # Extract route information distance_meters = leg['distance']['value'] distance_text = leg['distance']['text'] duration_seconds = leg['duration']['value'] duration_text = leg['duration']['text'] # Get traffic-aware duration if available duration_in_traffic = leg.get('duration_in_traffic') if duration_in_traffic: traffic_duration_seconds = duration_in_traffic['value'] traffic_duration_text = duration_in_traffic['text'] else: traffic_duration_seconds = duration_seconds traffic_duration_text = duration_text # Get route summary route_summary = route.get('summary', 'Via main roads') # Prepare response response = { "success": True, "origin": leg['start_address'], "destination": leg['end_address'], "distance": { "meters": distance_meters, "text": distance_text }, "duration": { "seconds": duration_seconds, "text": duration_text }, "duration_in_traffic": { "seconds": traffic_duration_seconds, "text": traffic_duration_text }, "mode": mode, "route_summary": route_summary, "confidence": "high (Google Maps API)" } # Add turn-by-turn steps if requested if include_steps and 'steps' in leg: steps = [] for step in leg['steps']: steps.append({ "instruction": step.get('html_instructions', '').replace('', '').replace('', ''), "distance": step['distance']['text'], "duration": step['duration']['text'] }) response["steps"] = steps response["total_steps"] = len(steps) # Add alternative routes if requested if alternatives and len(result) > 1: alt_routes = [] for alt_route in result[1:]: # Skip first route (already returned) alt_leg = alt_route['legs'][0] alt_routes.append({ "route_summary": alt_route.get('summary', 'Alternative route'), "distance": alt_leg['distance']['text'], "duration": alt_leg['duration']['text'] }) response["alternatives"] = alt_routes response["alternatives_count"] = len(alt_routes) logger.info(f"Route calculated: {distance_text}, {traffic_duration_text}") return response except Exception as e: logger.error(f"Google Maps Directions API error: {e}") raise def _location_to_latlng(location: str) -> dict: """ Convert location (address or coordinates) to lat/lng dict for Routes API Args: location: Either an address string or "lat,lng" coordinates Returns: Dict with {"latitude": float, "longitude": float} """ # Check if already in "lat,lng" format if ',' in location: parts = location.split(',') if len(parts) == 2: try: lat = float(parts[0].strip()) lng = float(parts[1].strip()) return {"latitude": lat, "longitude": lng} except ValueError: pass # Not valid coordinates, treat as address # Geocode the address geocoded = geocoding_service.geocode(location) return { "latitude": geocoded["lat"], "longitude": geocoded["lng"] } def _calculate_route_routes_api(origin: str, destination: str, mode: str, alternatives: bool, include_steps: bool, vehicle_type: str = "car", tool_input: dict = None) -> dict: """ Calculate route using Google Routes API (new, recommended) This uses the modern Routes API which provides better accuracy, real-time traffic data, vehicle-specific routing, and additional features. Args: origin: Starting location (address or "lat,lng") destination: Ending location (address or "lat,lng") mode: Travel mode (driving, walking, bicycling, transit, TWO_WHEELER) alternatives: Whether to return alternative routes include_steps: Whether to include turn-by-turn directions vehicle_type: Vehicle type (motorcycle, bicycle, car, van, truck) tool_input: Original tool input dict for route modifiers Returns: Route calculation result dict with vehicle-specific data """ if tool_input is None: tool_input = {} import requests import re try: # Convert locations to lat/lng origin_latlng = _location_to_latlng(origin) dest_latlng = _location_to_latlng(destination) # Map travel modes to Routes API format mode_mapping = { "driving": "DRIVE", "walking": "WALK", "bicycling": "BICYCLE", "transit": "TRANSIT", "TWO_WHEELER": "TWO_WHEELER" # Motorcycle-specific routing } routes_mode = mode_mapping.get(mode, "DRIVE") # Prepare API request url = "https://routes.googleapis.com/directions/v2:computeRoutes" # Build enhanced field mask for vehicle-specific data field_mask_parts = [ "routes.duration", "routes.staticDuration", # Duration without traffic "routes.distanceMeters", "routes.polyline.encodedPolyline", "routes.legs", "routes.description", "routes.localizedValues", "routes.routeLabels", # Get route type labels (FUEL_EFFICIENT, etc.) "routes.travelAdvisory.speedReadingIntervals", # Traffic segments "routes.travelAdvisory.tollInfo" # Toll information ] # Add fuel consumption for DRIVE mode if routes_mode == "DRIVE": field_mask_parts.append("routes.travelAdvisory.fuelConsumptionMicroliters") headers = { "Content-Type": "application/json", "X-Goog-Api-Key": geocoding_service.google_maps_key, "X-Goog-FieldMask": ",".join(field_mask_parts) } # Build request body body = { "origin": { "location": { "latLng": origin_latlng } }, "destination": { "location": { "latLng": dest_latlng } }, "travelMode": routes_mode, "computeAlternativeRoutes": alternatives, "languageCode": "en-US", "units": "METRIC" } # Add routing preference only for DRIVE and TWO_WHEELER (not for WALK/BICYCLE) if routes_mode in ["DRIVE", "TWO_WHEELER"]: body["routingPreference"] = "TRAFFIC_AWARE" # Add route modifiers based on vehicle type route_modifiers = {} # Vehicle emission type for DRIVE mode (cars, vans, trucks) if routes_mode == "DRIVE": emission_type = tool_input.get("emission_type", "GASOLINE").upper() if emission_type in ["GASOLINE", "ELECTRIC", "HYBRID", "DIESEL"]: route_modifiers["vehicleInfo"] = { "emissionType": emission_type } # Avoid options (applicable to DRIVE and TWO_WHEELER) if routes_mode in ["DRIVE", "TWO_WHEELER"]: if tool_input.get("avoid_tolls", False): route_modifiers["avoidTolls"] = True if tool_input.get("avoid_highways", False): route_modifiers["avoidHighways"] = True if tool_input.get("avoid_ferries", False): route_modifiers["avoidFerries"] = True if route_modifiers: body["routeModifiers"] = route_modifiers # Add extra computations for enhanced data extra_computations = [] # Traffic data for DRIVE and TWO_WHEELER if routes_mode in ["DRIVE", "TWO_WHEELER"]: extra_computations.append("TRAFFIC_ON_POLYLINE") # Toll information (unless avoiding tolls) if not tool_input.get("avoid_tolls", False): extra_computations.append("TOLLS") # Fuel consumption for DRIVE mode only if routes_mode == "DRIVE": extra_computations.append("FUEL_CONSUMPTION") if extra_computations: body["extraComputations"] = extra_computations # Request fuel-efficient alternative for DRIVE mode if routes_mode == "DRIVE" and tool_input.get("request_fuel_efficient", False): body["requestedReferenceRoutes"] = ["FUEL_EFFICIENT"] # Make API request logger.info(f"Calling Routes API: {origin} → {destination} (mode: {routes_mode})") response = requests.post(url, headers=headers, json=body, timeout=10) if response.status_code != 200: logger.error(f"Routes API error: {response.status_code} - {response.text}") raise Exception(f"Routes API returned {response.status_code}: {response.text[:200]}") data = response.json() if not data.get("routes"): logger.warning(f"Routes API found no routes for: {origin} → {destination}") return _calculate_route_google(origin, destination, mode, alternatives, include_steps) # Get first (best) route route = data["routes"][0] # Extract distance distance_meters = route.get("distanceMeters", 0) if distance_meters >= 1000: distance_text = f"{distance_meters/1000:.1f} km" else: distance_text = f"{distance_meters} m" # Helper function to format duration def format_duration(seconds): hours = seconds // 3600 minutes = (seconds % 3600) // 60 if hours > 0: return f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}" else: return f"{minutes} min{'s' if minutes != 1 else ''}" # Extract duration WITH traffic (format: "123s" or "123.456s") duration_str = route.get("duration", "0s") duration_with_traffic_seconds = int(float(re.sub(r'[^\d.]', '', duration_str))) # Extract static duration (WITHOUT traffic) static_duration_str = route.get("staticDuration", duration_str) static_duration_seconds = int(float(re.sub(r'[^\d.]', '', static_duration_str))) # Calculate traffic delay traffic_delay_seconds = duration_with_traffic_seconds - static_duration_seconds # Get route description/summary and labels route_summary = route.get("description", "Route via Routes API") route_labels = route.get("routeLabels", []) # Extract travel advisory information travel_advisory = route.get("travelAdvisory", {}) # Toll information toll_info = travel_advisory.get("tollInfo") has_tolls = toll_info is not None # Fuel consumption (DRIVE mode only) fuel_consumption_ml = travel_advisory.get("fuelConsumptionMicroliters") fuel_consumption_liters = None if fuel_consumption_ml: fuel_consumption_liters = float(fuel_consumption_ml) / 1_000_000 # Traffic segments speed_intervals = travel_advisory.get("speedReadingIntervals", []) has_traffic_data = len(speed_intervals) > 0 # Get origin and destination addresses (geocode if needed) origin_geocoded = geocoding_service.geocode(origin) dest_geocoded = geocoding_service.geocode(destination) # Build enhanced response with vehicle-specific data response_data = { "success": True, "origin": origin_geocoded["formatted_address"], "destination": dest_geocoded["formatted_address"], "distance": { "meters": distance_meters, "text": distance_text }, "duration": { "seconds": static_duration_seconds, "text": format_duration(static_duration_seconds) }, "duration_in_traffic": { "seconds": duration_with_traffic_seconds, "text": format_duration(duration_with_traffic_seconds) }, "traffic_delay": { "seconds": traffic_delay_seconds, "text": format_duration(traffic_delay_seconds) if traffic_delay_seconds > 0 else "No delay" }, "mode": mode, "vehicle_type": vehicle_type, "route_summary": route_summary, "route_labels": route_labels, "confidence": "high (Routes API with real-time traffic)" } # Add toll information if available if has_tolls: response_data["toll_info"] = { "has_tolls": True, "details": "Toll roads on route" } else: response_data["toll_info"] = {"has_tolls": False} # Add fuel consumption if available (DRIVE mode) if fuel_consumption_liters is not None: response_data["fuel_consumption"] = { "liters": round(fuel_consumption_liters, 2), "text": f"{fuel_consumption_liters:.2f} L" } # Add traffic data availability indicator if has_traffic_data: response_data["traffic_data_available"] = True response_data["traffic_segments_count"] = len(speed_intervals) # Add beta warnings for specific modes if routes_mode == "TWO_WHEELER": response_data["warning"] = ( "Motorcycle routing uses TWO_WHEELER mode (beta). " "May occasionally miss clear paths. Billed at higher rate." ) elif routes_mode == "BICYCLE": response_data["warning"] = ( "Bicycle routing is in beta and may occasionally miss clear bike paths." ) # Add turn-by-turn steps if requested if include_steps and route.get("legs"): steps = [] for leg in route["legs"]: if leg.get("steps"): for step in leg["steps"]: # Routes API has different step format, adapt as needed steps.append({ "instruction": step.get("navigationInstruction", {}).get("instructions", "Continue"), "distance": step.get("distanceMeters", 0), "duration": step.get("staticDuration", "0s") }) if steps: response_data["steps"] = steps response_data["steps_count"] = len(steps) # Add alternative routes if requested and available if alternatives and len(data["routes"]) > 1: alt_routes = [] for alt_route in data["routes"][1:]: alt_distance = alt_route.get("distanceMeters", 0) alt_duration_str = alt_route.get("duration", "0s") alt_duration_sec = int(float(re.sub(r'[^\d.]', '', alt_duration_str))) alt_hours = alt_duration_sec // 3600 alt_minutes = (alt_duration_sec % 3600) // 60 if alt_hours > 0: alt_duration_text = f"{alt_hours} hour{'s' if alt_hours > 1 else ''} {alt_minutes} min" else: alt_duration_text = f"{alt_minutes} min" alt_routes.append({ "route_summary": alt_route.get("description", "Alternative route"), "distance": f"{alt_distance/1000:.1f} km" if alt_distance >= 1000 else f"{alt_distance} m", "duration": alt_duration_text }) response_data["alternatives"] = alt_routes response_data["alternatives_count"] = len(alt_routes) logger.info(f"Routes API: {distance_text}, {format_duration(duration_with_traffic_seconds)}") return response_data except Exception as e: logger.error(f"Routes API error: {e}") raise # City-specific traffic profiles for realistic routing CITY_PROFILES = { "dhaka": { "name": "Dhaka, Bangladesh", "peak_speed_kmh": 8, # 8 km/h during peak hours (7-10 AM, 5-9 PM) "offpeak_speed_kmh": 18, # 18 km/h during off-peak hours "night_speed_kmh": 25, # 25 km/h at night (10 PM - 6 AM) "signals_per_km": 4, # 4 traffic signals per km in urban areas "signal_delay_sec": 50, # 50 seconds average per signal "intersection_delay_per_km": 30, # 30 seconds per km for intersections "congestion_multiplier": 2.5, # Heavy congestion factor "keywords": ["dhaka", "bangladesh"] }, "default": { "name": "Default Urban Area", "peak_speed_kmh": 20, # 20 km/h during peak hours "offpeak_speed_kmh": 30, # 30 km/h during off-peak hours "night_speed_kmh": 40, # 40 km/h at night "signals_per_km": 2, # 2 traffic signals per km "signal_delay_sec": 45, # 45 seconds average per signal "intersection_delay_per_km": 20, # 20 seconds per km "congestion_multiplier": 1.5, # Moderate congestion "keywords": [] } } def _calculate_route_mock(origin: str, destination: str, mode: str) -> dict: """Mock route calculation with realistic urban traffic modeling""" import math from datetime import datetime # Try to geocode both locations to get coordinates try: origin_geocoded = geocoding_service.geocode(origin) dest_geocoded = geocoding_service.geocode(destination) origin_lat = origin_geocoded["lat"] origin_lng = origin_geocoded["lng"] dest_lat = dest_geocoded["lat"] dest_lng = dest_geocoded["lng"] # Detect city from destination address dest_address_lower = dest_geocoded["formatted_address"].lower() city_profile = CITY_PROFILES["default"] for city_key, profile in CITY_PROFILES.items(): if city_key != "default": for keyword in profile["keywords"]: if keyword in dest_address_lower: city_profile = profile logger.info(f"Detected city: {profile['name']}") break if city_profile != CITY_PROFILES["default"]: break # Detect time of day current_hour = datetime.now().hour if 7 <= current_hour < 10 or 17 <= current_hour < 21: time_period = "peak" speed_kmh = city_profile["peak_speed_kmh"] elif 22 <= current_hour or current_hour < 6: time_period = "night" speed_kmh = city_profile["night_speed_kmh"] else: time_period = "offpeak" speed_kmh = city_profile["offpeak_speed_kmh"] logger.info(f"Time period: {time_period}, base speed: {speed_kmh} km/h") # Calculate straight-line distance using Haversine formula R = 6371000 # Earth radius in meters phi1 = math.radians(origin_lat) phi2 = math.radians(dest_lat) delta_phi = math.radians(dest_lat - origin_lat) delta_lambda = math.radians(dest_lng - origin_lng) a = math.sin(delta_phi/2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(delta_lambda/2)**2 c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) distance_meters = R * c # Estimate driving distance based on mode if mode == "driving": distance_meters *= 1.3 # 30% longer for road network speed_mps = speed_kmh / 3.6 # Convert km/h to m/s elif mode == "walking": distance_meters *= 1.2 speed_mps = 1.4 # ~5 km/h walking speed elif mode == "bicycling": distance_meters *= 1.25 speed_mps = 4.5 # ~16 km/h cycling speed elif mode == "transit": distance_meters *= 1.4 speed_mps = 8.9 # ~32 km/h transit speed else: speed_mps = speed_kmh / 3.6 # Calculate base duration from speed base_duration_seconds = int(distance_meters / speed_mps) # Add realistic urban delays for driving mode traffic_duration_seconds = base_duration_seconds if mode == "driving": distance_km = distance_meters / 1000.0 # Add traffic signal delays num_signals = int(distance_km * city_profile["signals_per_km"]) signal_delay = num_signals * city_profile["signal_delay_sec"] # Add intersection delays intersection_delay = int(distance_km * city_profile["intersection_delay_per_km"]) # Apply congestion multiplier for peak hours if time_period == "peak": congestion_delay = int(base_duration_seconds * (city_profile["congestion_multiplier"] - 1.0)) else: congestion_delay = 0 # Calculate total traffic-aware duration traffic_duration_seconds = base_duration_seconds + signal_delay + intersection_delay + congestion_delay # Apply minimum travel time (2 minutes) MIN_TRAVEL_TIME = 120 if traffic_duration_seconds < MIN_TRAVEL_TIME: traffic_duration_seconds = MIN_TRAVEL_TIME logger.info(f"Urban delays - Signals: {signal_delay}s, Intersections: {intersection_delay}s, Congestion: {congestion_delay}s") # Format distance if distance_meters >= 1000: distance_text = f"{distance_meters/1000:.1f} km" else: distance_text = f"{int(distance_meters)} m" # Format base duration hours = base_duration_seconds // 3600 minutes = (base_duration_seconds % 3600) // 60 if hours > 0: base_duration_text = f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}" else: base_duration_text = f"{minutes} min{'s' if minutes != 1 else ''}" # Format traffic-aware duration hours = traffic_duration_seconds // 3600 minutes = (traffic_duration_seconds % 3600) // 60 if hours > 0: traffic_duration_text = f"{hours} hour{'s' if hours > 1 else ''} {minutes} min{'s' if minutes != 1 else ''}" else: traffic_duration_text = f"{minutes} min{'s' if minutes != 1 else ''}" logger.info(f"Mock route calculated: {distance_text}, {traffic_duration_text} (base: {base_duration_text}, city: {city_profile['name']})") return { "success": True, "origin": origin_geocoded["formatted_address"], "destination": dest_geocoded["formatted_address"], "distance": { "meters": int(distance_meters), "text": distance_text }, "duration": { "seconds": base_duration_seconds, "text": base_duration_text }, "duration_in_traffic": { "seconds": traffic_duration_seconds, "text": traffic_duration_text }, "mode": mode, "route_summary": f"Direct route via {city_profile['name']} ({time_period} traffic)", "confidence": "low (mock calculation with urban traffic modeling)" } except Exception as e: logger.error(f"Mock route calculation failed: {e}") return { "success": False, "error": f"Could not calculate route: {str(e)}" } def handle_create_order(tool_input: dict) -> dict: """ Execute order creation tool Args: tool_input: Dict with order fields (expected_delivery_time now REQUIRED) Returns: Order creation result """ # Extract fields with defaults customer_name = tool_input.get("customer_name") customer_phone = tool_input.get("customer_phone") customer_email = tool_input.get("customer_email") delivery_address = tool_input.get("delivery_address") delivery_lat = tool_input.get("delivery_lat") delivery_lng = tool_input.get("delivery_lng") expected_delivery_time_str = tool_input.get("expected_delivery_time") priority = tool_input.get("priority", "standard") special_instructions = tool_input.get("special_instructions") weight_kg = tool_input.get("weight_kg", 5.0) volume_m3 = tool_input.get("volume_m3", 1.0) is_fragile = tool_input.get("is_fragile", False) requires_cold_storage = tool_input.get("requires_cold_storage", False) requires_signature = tool_input.get("requires_signature", False) sla_grace_period_minutes = tool_input.get("sla_grace_period_minutes", 15) # Validate required fields (expected_delivery_time is now MANDATORY) if not all([customer_name, delivery_address, delivery_lat, delivery_lng, expected_delivery_time_str]): return { "success": False, "error": "Missing required fields: customer_name, delivery_address, delivery_lat, delivery_lng, expected_delivery_time" } # Generate order ID with microseconds to prevent collisions now = datetime.now() order_id = f"ORD-{now.strftime('%Y%m%d%H%M%S%f')[:18]}" # YYYYMMDDHHMMSSμμμμμμ (18 chars) # Parse and validate expected_delivery_time try: expected_delivery_time = datetime.fromisoformat(expected_delivery_time_str.replace('Z', '+00:00')) # Validate it's in the future if expected_delivery_time <= now: return { "success": False, "error": f"expected_delivery_time must be in the future. Provided: {expected_delivery_time_str}, Current time: {now.isoformat()}" } except (ValueError, AttributeError) as e: return { "success": False, "error": f"Invalid expected_delivery_time format. Must be ISO 8601 format (e.g., '2025-11-15T18:00:00'). Error: {str(e)}" } # Handle time window (kept for backward compatibility) time_window_end_str = tool_input.get("time_window_end") if time_window_end_str: try: time_window_end = datetime.fromisoformat(time_window_end_str.replace('Z', '+00:00')) except: time_window_end = expected_delivery_time # Use expected time as fallback else: time_window_end = expected_delivery_time # Default to expected delivery time time_window_start = now + timedelta(hours=2) # Insert into database query = """ INSERT INTO orders ( order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature, status, special_instructions, sla_grace_period_minutes ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = ( order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature, "pending", special_instructions, sla_grace_period_minutes ) try: execute_write(query, params) logger.info(f"Order created: {order_id}, expected delivery: {expected_delivery_time.strftime('%Y-%m-%d %H:%M')}") return { "success": True, "order_id": order_id, "status": "pending", "customer": customer_name, "address": delivery_address, "expected_delivery": expected_delivery_time.strftime("%Y-%m-%d %H:%M"), "sla_grace_period_minutes": sla_grace_period_minutes, "priority": priority, "message": f"Order {order_id} created successfully! Expected delivery: {expected_delivery_time.strftime('%Y-%m-%d %H:%M')}" } except Exception as e: logger.error(f"Database error creating order: {e}") return { "success": False, "error": f"Failed to create order: {str(e)}" } def handle_create_driver(tool_input: dict) -> dict: """ Execute driver creation tool Args: tool_input: Dict with driver fields Returns: Driver creation result """ # Extract fields with defaults name = tool_input.get("name") phone = tool_input.get("phone") email = tool_input.get("email") vehicle_type = tool_input.get("vehicle_type") # No default - REQUIRED vehicle_plate = tool_input.get("vehicle_plate") capacity_kg = tool_input.get("capacity_kg", 1000.0) capacity_m3 = tool_input.get("capacity_m3", 12.0) current_lat = tool_input.get("current_lat") # No default - REQUIRED current_lng = tool_input.get("current_lng") # No default - REQUIRED # Convert skills to regular list (handles protobuf RepeatedComposite) skills_raw = tool_input.get("skills", []) skills = list(skills_raw) if skills_raw else [] status = tool_input.get("status", "active") # Validate ALL required fields (name, vehicle_type, current_lat, current_lng) if not all([name, vehicle_type, current_lat is not None, current_lng is not None]): return { "success": False, "error": "Missing required fields: name, vehicle_type, current_lat, current_lng. All fields are mandatory." } # Validate coordinates are valid numbers try: current_lat = float(current_lat) current_lng = float(current_lng) except (ValueError, TypeError): return { "success": False, "error": "current_lat and current_lng must be valid numbers" } # Validate coordinates are within valid ranges if not (-90 <= current_lat <= 90): return { "success": False, "error": f"Invalid latitude {current_lat}. Must be between -90 and 90" } if not (-180 <= current_lng <= 180): return { "success": False, "error": f"Invalid longitude {current_lng}. Must be between -180 and 180" } # Generate driver ID with microseconds to prevent collisions now = datetime.now() driver_id = f"DRV-{now.strftime('%Y%m%d%H%M%S%f')[:18]}" # YYYYMMDDHHMMSSμμμμμμ (18 chars) # Insert into database query = """ INSERT INTO drivers ( driver_id, name, phone, email, current_lat, current_lng, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # Convert skills list to JSON import json skills_json = json.dumps(skills) if skills else json.dumps([]) params = ( driver_id, name, phone, email, current_lat, current_lng, now, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills_json ) try: execute_write(query, params) logger.info(f"Driver created: {driver_id}") return { "success": True, "driver_id": driver_id, "name": name, "status": status, "vehicle_type": vehicle_type, "vehicle_plate": vehicle_plate, "capacity_kg": capacity_kg, "skills": skills, "message": f"Driver {driver_id} ({name}) created successfully!" } except Exception as e: logger.error(f"Database error creating driver: {e}") return { "success": False, "error": f"Failed to create driver: {str(e)}" } def handle_update_order(tool_input: dict) -> dict: """ Execute order update tool with assignment cascading logic Args: tool_input: Dict with order_id and fields to update Returns: Update result """ import json order_id = tool_input.get("order_id") # Validate required field if not order_id: return { "success": False, "error": "Missing required field: order_id" } # Check if order exists and get current status check_query = "SELECT order_id, status, assigned_driver_id FROM orders WHERE order_id = %s" existing = execute_query(check_query, (order_id,)) if not existing: return { "success": False, "error": f"Order {order_id} not found" } current_status = existing[0].get("status") current_assigned_driver = existing[0].get("assigned_driver_id") # Auto-geocode if delivery address is updated without coordinates if "delivery_address" in tool_input and ("delivery_lat" not in tool_input or "delivery_lng" not in tool_input): from chat.geocoding import GeocodingService geocoding_service = GeocodingService() try: geocode_result = geocoding_service.geocode(tool_input["delivery_address"]) tool_input["delivery_lat"] = geocode_result["lat"] tool_input["delivery_lng"] = geocode_result["lng"] logger.info(f"Auto-geocoded delivery address: {geocode_result['formatted_address']}") except Exception as e: logger.warning(f"Failed to geocode address, skipping coordinate update: {e}") # Handle status changes with assignment cascading logic new_status = tool_input.get("status") cascading_actions = [] if new_status and new_status != current_status: # Check if order has active assignment assignment_check = execute_query(""" SELECT assignment_id, status, driver_id FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') LIMIT 1 """, (order_id,)) has_active_assignment = len(assignment_check) > 0 # Validate status transitions based on assignment state if new_status == "pending" and current_status == "assigned": if has_active_assignment: # Changing assigned order back to pending - must cancel assignment assignment_id = assignment_check[0]["assignment_id"] driver_id = assignment_check[0]["driver_id"] # Cancel the assignment execute_write(""" UPDATE assignments SET status = 'cancelled', updated_at = %s WHERE assignment_id = %s """, (datetime.now(), assignment_id)) # Clear assigned_driver_id from order execute_write(""" UPDATE orders SET assigned_driver_id = NULL WHERE order_id = %s """, (order_id,)) # Check if driver has other active assignments other_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) if other_assignments[0]["count"] == 0: # Set driver back to active if no other assignments execute_write(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (datetime.now(), driver_id)) cascading_actions.append(f"Driver {driver_id} set to active (no other assignments)") cascading_actions.append(f"Assignment {assignment_id} cancelled and removed") elif new_status == "cancelled": if has_active_assignment: # Cancel active assignment when order is cancelled assignment_id = assignment_check[0]["assignment_id"] driver_id = assignment_check[0]["driver_id"] execute_write(""" UPDATE assignments SET status = 'cancelled', updated_at = %s WHERE assignment_id = %s """, (datetime.now(), assignment_id)) # Clear assigned_driver_id execute_write(""" UPDATE orders SET assigned_driver_id = NULL WHERE order_id = %s """, (order_id,)) # Check if driver has other active assignments other_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) if other_assignments[0]["count"] == 0: execute_write(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (datetime.now(), driver_id)) cascading_actions.append(f"Driver {driver_id} set to active") cascading_actions.append(f"Assignment {assignment_id} cancelled") elif new_status in ["delivered", "failed"] and has_active_assignment: # Note: This should normally be handled by update_assignment tool # but we allow it here for flexibility assignment_id = assignment_check[0]["assignment_id"] final_status = "completed" if new_status == "delivered" else "failed" execute_write(""" UPDATE assignments SET status = %s, updated_at = %s WHERE assignment_id = %s """, (final_status, datetime.now(), assignment_id)) cascading_actions.append(f"Assignment {assignment_id} marked as {final_status}") # Build UPDATE query dynamically based on provided fields update_fields = [] params = [] # Map of field names to their database columns updateable_fields = { "customer_name": "customer_name", "customer_phone": "customer_phone", "customer_email": "customer_email", "delivery_address": "delivery_address", "delivery_lat": "delivery_lat", "delivery_lng": "delivery_lng", "status": "status", "priority": "priority", "special_instructions": "special_instructions", "time_window_end": "time_window_end", "payment_status": "payment_status", "weight_kg": "weight_kg", "order_value": "order_value" } for field, column in updateable_fields.items(): if field in tool_input: update_fields.append(f"{column} = %s") params.append(tool_input[field]) if not update_fields: return { "success": False, "error": "No fields provided to update" } # Always update the updated_at timestamp update_fields.append("updated_at = %s") params.append(datetime.now()) # Add order_id for WHERE clause params.append(order_id) # Execute update query = f""" UPDATE orders SET {', '.join(update_fields)} WHERE order_id = %s """ try: execute_write(query, tuple(params)) logger.info(f"Order updated: {order_id}") result = { "success": True, "order_id": order_id, "updated_fields": list(updateable_fields.keys() & tool_input.keys()), "message": f"Order {order_id} updated successfully!" } if cascading_actions: result["cascading_actions"] = cascading_actions return result except Exception as e: logger.error(f"Database error updating order: {e}") return { "success": False, "error": f"Failed to update order: {str(e)}" } def handle_delete_all_orders(tool_input: dict) -> dict: """ Delete all orders (bulk delete) Args: tool_input: Dict with confirm flag and optional status filter Returns: Deletion result with count """ confirm = tool_input.get("confirm", False) status_filter = tool_input.get("status") # Optional: delete only specific status if not confirm: return { "success": False, "error": "Bulk deletion requires confirm=true for safety" } try: # Check for active assignments first active_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE status IN ('active', 'in_progress') """) active_count = active_assignments[0]['count'] if active_count > 0: return { "success": False, "error": f"Cannot delete orders: {active_count} active assignment(s) exist. Cancel or complete them first." } # Build delete query based on status filter if status_filter: count_query = "SELECT COUNT(*) as count FROM orders WHERE status = %s" delete_query = "DELETE FROM orders WHERE status = %s" params = (status_filter,) else: count_query = "SELECT COUNT(*) as count FROM orders" delete_query = "DELETE FROM orders" params = () # Get count before deletion count_result = execute_query(count_query, params) total_count = count_result[0]['count'] if total_count == 0: return { "success": True, "deleted_count": 0, "message": "No orders to delete" } # Execute bulk delete execute_write(delete_query, params) logger.info(f"Bulk deleted {total_count} orders") return { "success": True, "deleted_count": total_count, "message": f"Successfully deleted {total_count} order(s)" } except Exception as e: logger.error(f"Database error bulk deleting orders: {e}") return { "success": False, "error": f"Failed to bulk delete orders: {str(e)}" } def handle_delete_order(tool_input: dict) -> dict: """ Execute order deletion tool with assignment safety checks Args: tool_input: Dict with order_id and confirm flag Returns: Deletion result """ order_id = tool_input.get("order_id") confirm = tool_input.get("confirm", False) # Validate required fields if not order_id: return { "success": False, "error": "Missing required field: order_id" } if not confirm: return { "success": False, "error": "Deletion not confirmed. Set confirm=true to proceed." } # Check if order exists check_query = "SELECT order_id, status FROM orders WHERE order_id = %s" existing = execute_query(check_query, (order_id,)) if not existing: return { "success": False, "error": f"Order {order_id} not found" } order_status = existing[0].get("status") # Check for active assignments assignment_check = execute_query(""" SELECT assignment_id, status, driver_id FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') """, (order_id,)) if assignment_check: # Warn about active assignments that will be cascade deleted assignment_count = len(assignment_check) assignment_ids = [a["assignment_id"] for a in assignment_check] return { "success": False, "error": f"Cannot delete order {order_id}: it has {assignment_count} active assignment(s): {', '.join(assignment_ids)}. Please cancel or complete the assignment(s) first using update_assignment or unassign_order.", "active_assignments": assignment_ids } # Check for any completed assignments (these will be cascade deleted) completed_assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments WHERE order_id = %s AND status IN ('completed', 'failed', 'cancelled') """, (order_id,)) cascading_info = [] if completed_assignments[0]["count"] > 0: cascading_info.append(f"{completed_assignments[0]['count']} completed/failed/cancelled assignment(s) will be cascade deleted") # Delete the order (will cascade to assignments via FK) query = "DELETE FROM orders WHERE order_id = %s" try: execute_write(query, (order_id,)) logger.info(f"Order deleted: {order_id}") result = { "success": True, "order_id": order_id, "message": f"Order {order_id} has been permanently deleted." } if cascading_info: result["cascading_info"] = cascading_info return result except Exception as e: logger.error(f"Database error deleting order: {e}") return { "success": False, "error": f"Failed to delete order: {str(e)}" } def handle_update_driver(tool_input: dict) -> dict: """ Execute driver update tool with assignment validation Args: tool_input: Dict with driver_id and fields to update Returns: Update result """ import json driver_id = tool_input.get("driver_id") # Validate required field if not driver_id: return { "success": False, "error": "Missing required field: driver_id" } # Check if driver exists and get current status check_query = "SELECT driver_id, status FROM drivers WHERE driver_id = %s" existing = execute_query(check_query, (driver_id,)) if not existing: return { "success": False, "error": f"Driver {driver_id} not found" } current_status = existing[0].get("status") # Validate status changes against active assignments new_status = tool_input.get("status") if new_status and new_status != current_status: # Check for active assignments assignment_check = execute_query(""" SELECT assignment_id, status, order_id FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') """, (driver_id,)) has_active_assignments = len(assignment_check) > 0 # Prevent setting driver to offline/inactive when they have active assignments if new_status in ["offline", "inactive"] and has_active_assignments: assignment_count = len(assignment_check) assignment_ids = [a["assignment_id"] for a in assignment_check] return { "success": False, "error": f"Cannot set driver {driver_id} to '{new_status}': driver has {assignment_count} active assignment(s): {', '.join(assignment_ids)}. Please complete or cancel assignments first.", "active_assignments": assignment_ids } # Note: Setting driver to 'active' when they have assignments is allowed # The system manages 'busy' status automatically via assignment creation # But we allow manual override to 'active' for edge cases # Build UPDATE query dynamically based on provided fields update_fields = [] params = [] # Map of field names to their database columns updateable_fields = { "name": "name", "phone": "phone", "email": "email", "status": "status", "vehicle_type": "vehicle_type", "vehicle_plate": "vehicle_plate", "capacity_kg": "capacity_kg", "capacity_m3": "capacity_m3", "current_lat": "current_lat", "current_lng": "current_lng" } for field, column in updateable_fields.items(): if field in tool_input: update_fields.append(f"{column} = %s") params.append(tool_input[field]) # Handle skills array specially (convert to JSON) if "skills" in tool_input: skills = list(tool_input.get("skills", [])) update_fields.append("skills = %s") params.append(json.dumps(skills)) if not update_fields: return { "success": False, "error": "No fields provided to update" } # Always update the updated_at timestamp update_fields.append("updated_at = %s") params.append(datetime.now()) # Update location timestamp if lat/lng changed if "current_lat" in tool_input or "current_lng" in tool_input: update_fields.append("last_location_update = %s") params.append(datetime.now()) # Add driver_id for WHERE clause params.append(driver_id) # Execute update query = f""" UPDATE drivers SET {', '.join(update_fields)} WHERE driver_id = %s """ try: execute_write(query, tuple(params)) logger.info(f"Driver updated: {driver_id}") updated_list = list(updateable_fields.keys() & tool_input.keys()) if "skills" in tool_input: updated_list.append("skills") return { "success": True, "driver_id": driver_id, "updated_fields": updated_list, "message": f"Driver {driver_id} updated successfully!" } except Exception as e: logger.error(f"Database error updating driver: {e}") return { "success": False, "error": f"Failed to update driver: {str(e)}" } def handle_delete_all_drivers(tool_input: dict) -> dict: """ Delete all drivers (bulk delete) Args: tool_input: Dict with confirm flag and optional status filter Returns: Deletion result with count """ confirm = tool_input.get("confirm", False) status_filter = tool_input.get("status") # Optional: delete only specific status if not confirm: return { "success": False, "error": "Bulk deletion requires confirm=true for safety" } try: # Check for ANY assignments (RESTRICT constraint will block if any exist) assignments = execute_query(""" SELECT COUNT(*) as count FROM assignments """) assignment_count = assignments[0]['count'] if assignment_count > 0: return { "success": False, "error": f"Cannot delete drivers: {assignment_count} assignment(s) exist in database. Database RESTRICT constraint prevents driver deletion when assignments exist." } # Build delete query based on status filter if status_filter: count_query = "SELECT COUNT(*) as count FROM drivers WHERE status = %s" delete_query = "DELETE FROM drivers WHERE status = %s" params = (status_filter,) else: count_query = "SELECT COUNT(*) as count FROM drivers" delete_query = "DELETE FROM drivers" params = () # Get count before deletion count_result = execute_query(count_query, params) total_count = count_result[0]['count'] if total_count == 0: return { "success": True, "deleted_count": 0, "message": "No drivers to delete" } # Execute bulk delete execute_write(delete_query, params) logger.info(f"Bulk deleted {total_count} drivers") return { "success": True, "deleted_count": total_count, "message": f"Successfully deleted {total_count} driver(s)" } except Exception as e: logger.error(f"Database error bulk deleting drivers: {e}") # Provide more context if it's a FK constraint error error_message = str(e) if "foreign key" in error_message.lower() or "violates" in error_message.lower(): error_message = f"Cannot delete drivers due to database constraint (assignments exist). Error: {error_message}" return { "success": False, "error": f"Failed to bulk delete drivers: {error_message}" } def handle_delete_driver(tool_input: dict) -> dict: """ Execute driver deletion tool with assignment safety checks Args: tool_input: Dict with driver_id and confirm flag Returns: Deletion result """ driver_id = tool_input.get("driver_id") confirm = tool_input.get("confirm", False) # Validate required fields if not driver_id: return { "success": False, "error": "Missing required field: driver_id" } if not confirm: return { "success": False, "error": "Deletion not confirmed. Set confirm=true to proceed." } # Check if driver exists check_query = "SELECT driver_id, name FROM drivers WHERE driver_id = %s" existing = execute_query(check_query, (driver_id,)) if not existing: return { "success": False, "error": f"Driver {driver_id} not found" } driver_name = existing[0]["name"] # Check for ANY assignments (active or completed) # FK constraint with ON DELETE RESTRICT will prevent deletion if ANY assignments exist assignment_check = execute_query(""" SELECT assignment_id, status, order_id FROM assignments WHERE driver_id = %s """, (driver_id,)) if assignment_check: # Count active vs completed assignments active_assignments = [a for a in assignment_check if a["status"] in ("active", "in_progress")] completed_assignments = [a for a in assignment_check if a["status"] in ("completed", "failed", "cancelled")] total_count = len(assignment_check) active_count = len(active_assignments) completed_count = len(completed_assignments) error_msg = f"Cannot delete driver {driver_id} ({driver_name}): driver has {total_count} assignment(s)" if active_count > 0: active_ids = [a["assignment_id"] for a in active_assignments] error_msg += f" ({active_count} active: {', '.join(active_ids)})" if completed_count > 0: error_msg += f" ({completed_count} completed/failed/cancelled)" error_msg += ". The database has RESTRICT constraint preventing driver deletion when assignments exist. Please cancel/complete active assignments and consider archiving the driver instead of deleting." return { "success": False, "error": error_msg, "total_assignments": total_count, "active_assignments": [a["assignment_id"] for a in active_assignments], "completed_assignments": [a["assignment_id"] for a in completed_assignments] } # Check for orders that reference this driver in assigned_driver_id # FK constraint with ON DELETE SET NULL will set these to NULL assigned_orders = execute_query(""" SELECT order_id FROM orders WHERE assigned_driver_id = %s """, (driver_id,)) cascading_info = [] if assigned_orders: order_count = len(assigned_orders) cascading_info.append(f"{order_count} order(s) will have assigned_driver_id set to NULL") # Delete the driver query = "DELETE FROM drivers WHERE driver_id = %s" try: execute_write(query, (driver_id,)) logger.info(f"Driver deleted: {driver_id}") result = { "success": True, "driver_id": driver_id, "message": f"Driver {driver_id} ({driver_name}) has been permanently deleted." } if cascading_info: result["cascading_info"] = cascading_info return result except Exception as e: logger.error(f"Database error deleting driver: {e}") # Provide more context if it's a FK constraint error error_message = str(e) if "foreign key" in error_message.lower() or "violates" in error_message.lower(): error_message = f"Cannot delete driver due to database constraint (likely has related assignments). Error: {error_message}" return { "success": False, "error": f"Failed to delete driver: {error_message}" } def handle_count_orders(tool_input: dict) -> dict: """ Execute count orders tool Args: tool_input: Dict with optional filter fields Returns: Order count result with breakdown """ # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "priority" in tool_input: where_clauses.append("priority = %s") params.append(tool_input["priority"]) if "payment_status" in tool_input: where_clauses.append("payment_status = %s") params.append(tool_input["payment_status"]) if "assigned_driver_id" in tool_input: where_clauses.append("assigned_driver_id = %s") params.append(tool_input["assigned_driver_id"]) if "is_fragile" in tool_input: where_clauses.append("is_fragile = %s") params.append(tool_input["is_fragile"]) if "requires_signature" in tool_input: where_clauses.append("requires_signature = %s") params.append(tool_input["requires_signature"]) if "requires_cold_storage" in tool_input: where_clauses.append("requires_cold_storage = %s") params.append(tool_input["requires_cold_storage"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Total count query count_query = f"SELECT COUNT(*) as total FROM orders{where_sql}" # Breakdown by status query breakdown_query = f""" SELECT status, COUNT(*) as count FROM orders{where_sql} GROUP BY status ORDER BY count DESC """ # Breakdown by priority query priority_query = f""" SELECT priority, COUNT(*) as count FROM orders{where_sql} GROUP BY priority ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'express' THEN 2 WHEN 'standard' THEN 3 END """ try: # Execute queries total_result = execute_query(count_query, tuple(params) if params else None) total = total_result[0]['total'] if total_result else 0 status_result = execute_query(breakdown_query, tuple(params) if params else None) priority_result = execute_query(priority_query, tuple(params) if params else None) # Format breakdown status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} priority_breakdown = {row['priority']: row['count'] for row in priority_result} if priority_result else {} logger.info(f"Counted orders: {total} total") return { "success": True, "total": total, "status_breakdown": status_breakdown, "priority_breakdown": priority_breakdown, "message": f"Found {total} order(s)" } except Exception as e: logger.error(f"Database error counting orders: {e}") return { "success": False, "error": f"Failed to count orders: {str(e)}" } def handle_fetch_orders(tool_input: dict) -> dict: """ Execute fetch orders tool Args: tool_input: Dict with filter, pagination, and sorting options Returns: List of orders matching criteria """ # Extract pagination and sorting limit = min(tool_input.get("limit", 10), 100) # Cap at 100 offset = tool_input.get("offset", 0) sort_by = tool_input.get("sort_by", "created_at") sort_order = tool_input.get("sort_order", "DESC") # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "priority" in tool_input: where_clauses.append("priority = %s") params.append(tool_input["priority"]) if "payment_status" in tool_input: where_clauses.append("payment_status = %s") params.append(tool_input["payment_status"]) if "assigned_driver_id" in tool_input: where_clauses.append("assigned_driver_id = %s") params.append(tool_input["assigned_driver_id"]) if "is_fragile" in tool_input: where_clauses.append("is_fragile = %s") params.append(tool_input["is_fragile"]) if "requires_signature" in tool_input: where_clauses.append("requires_signature = %s") params.append(tool_input["requires_signature"]) if "requires_cold_storage" in tool_input: where_clauses.append("requires_cold_storage = %s") params.append(tool_input["requires_cold_storage"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Build query query = f""" SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, priority, weight_kg, volume_m3, special_instructions, status, assigned_driver_id, created_at, updated_at, delivered_at, order_value, payment_status, requires_signature, is_fragile, requires_cold_storage FROM orders {where_sql} ORDER BY {sort_by} {sort_order} LIMIT %s OFFSET %s """ params.extend([limit, offset]) try: results = execute_query(query, tuple(params)) if not results: return { "success": True, "orders": [], "count": 0, "message": "No orders found matching criteria" } # Format orders for readability orders = [] for row in results: order = { "order_id": row['order_id'], "customer": { "name": row['customer_name'], "phone": row['customer_phone'], "email": row['customer_email'] }, "delivery": { "address": row['delivery_address'], "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None }, "time_window": { "start": str(row['time_window_start']) if row['time_window_start'] else None, "end": str(row['time_window_end']) if row['time_window_end'] else None }, "details": { "priority": row['priority'], "status": row['status'], "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, "special_instructions": row['special_instructions'] }, "flags": { "requires_signature": row['requires_signature'], "is_fragile": row['is_fragile'], "requires_cold_storage": row['requires_cold_storage'] }, "payment": { "order_value": float(row['order_value']) if row['order_value'] else None, "payment_status": row['payment_status'] }, "assigned_driver_id": row['assigned_driver_id'], "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None, "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None } } orders.append(order) logger.info(f"Fetched {len(orders)} orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Retrieved {len(orders)} order(s)" } except Exception as e: logger.error(f"Database error fetching orders: {e}") return { "success": False, "error": f"Failed to fetch orders: {str(e)}" } def handle_get_order_details(tool_input: dict) -> dict: """ Execute get order details tool Args: tool_input: Dict with order_id Returns: Complete order details """ order_id = tool_input.get("order_id") if not order_id: return { "success": False, "error": "order_id is required" } query = """ SELECT order_id, customer_name, customer_phone, customer_email, pickup_address, pickup_lat, pickup_lng, delivery_address, delivery_lat, delivery_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, special_instructions, status, assigned_driver_id, delivery_status, created_at, updated_at, delivered_at, sla_grace_period_minutes, order_value, payment_status, requires_signature, is_fragile, requires_cold_storage FROM orders WHERE order_id = %s """ try: results = execute_query(query, (order_id,)) if not results: return { "success": False, "error": f"Order {order_id} not found" } row = results[0] order = { "order_id": row['order_id'], "customer": { "name": row['customer_name'], "phone": row['customer_phone'], "email": row['customer_email'] }, "pickup": { "address": row['pickup_address'], "latitude": float(row['pickup_lat']) if row['pickup_lat'] else None, "longitude": float(row['pickup_lng']) if row['pickup_lng'] else None } if row['pickup_address'] else None, "delivery": { "address": row['delivery_address'], "latitude": float(row['delivery_lat']) if row['delivery_lat'] else None, "longitude": float(row['delivery_lng']) if row['delivery_lng'] else None }, "time_window": { "start": str(row['time_window_start']) if row['time_window_start'] else None, "end": str(row['time_window_end']) if row['time_window_end'] else None }, "details": { "priority": row['priority'], "status": row['status'], "weight_kg": float(row['weight_kg']) if row['weight_kg'] else None, "volume_m3": float(row['volume_m3']) if row['volume_m3'] else None, "special_instructions": row['special_instructions'] }, "delivery_status": row['delivery_status'], "timing": { "expected_delivery_time": str(row['expected_delivery_time']) if row['expected_delivery_time'] else None, "delivered_at": str(row['delivered_at']) if row['delivered_at'] else None, "sla_grace_period_minutes": row['sla_grace_period_minutes'] }, "flags": { "requires_signature": row['requires_signature'], "is_fragile": row['is_fragile'], "requires_cold_storage": row['requires_cold_storage'] }, "payment": { "order_value": float(row['order_value']) if row['order_value'] else None, "payment_status": row['payment_status'] }, "assigned_driver_id": row['assigned_driver_id'], "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } logger.info(f"Retrieved details for order: {order_id}") return { "success": True, "order": order, "message": f"Order {order_id} details retrieved" } except Exception as e: logger.error(f"Database error getting order details: {e}") return { "success": False, "error": f"Failed to get order details: {str(e)}" } def handle_search_orders(tool_input: dict) -> dict: """ Execute search orders tool Args: tool_input: Dict with search_term Returns: List of matching orders """ search_term = tool_input.get("search_term", "").strip() if not search_term: return { "success": False, "error": "search_term is required" } query = """ SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, priority, status, created_at FROM orders WHERE order_id ILIKE %s OR customer_name ILIKE %s OR customer_email ILIKE %s OR customer_phone ILIKE %s ORDER BY created_at DESC LIMIT 50 """ search_pattern = f"%{search_term}%" params = (search_pattern, search_pattern, search_pattern, search_pattern) try: results = execute_query(query, params) if not results: return { "success": True, "orders": [], "count": 0, "message": f"No orders found matching '{search_term}'" } orders = [] for row in results: orders.append({ "order_id": row['order_id'], "customer_name": row['customer_name'], "customer_phone": row['customer_phone'], "customer_email": row['customer_email'], "delivery_address": row['delivery_address'], "priority": row['priority'], "status": row['status'], "created_at": str(row['created_at']) }) logger.info(f"Search '{search_term}' found {len(orders)} orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Found {len(orders)} order(s) matching '{search_term}'" } except Exception as e: logger.error(f"Database error searching orders: {e}") return { "success": False, "error": f"Failed to search orders: {str(e)}" } def handle_get_incomplete_orders(tool_input: dict) -> dict: """ Execute get incomplete orders tool Args: tool_input: Dict with optional limit Returns: List of incomplete orders (pending, assigned, in_transit) """ limit = min(tool_input.get("limit", 20), 100) query = """ SELECT order_id, customer_name, delivery_address, priority, status, time_window_end, created_at, assigned_driver_id FROM orders WHERE status IN ('pending', 'assigned', 'in_transit') ORDER BY CASE priority WHEN 'urgent' THEN 1 WHEN 'express' THEN 2 WHEN 'standard' THEN 3 END, time_window_end ASC LIMIT %s """ try: results = execute_query(query, (limit,)) if not results: return { "success": True, "orders": [], "count": 0, "message": "No incomplete orders found" } orders = [] for row in results: orders.append({ "order_id": row['order_id'], "customer_name": row['customer_name'], "delivery_address": row['delivery_address'], "priority": row['priority'], "status": row['status'], "time_window_end": str(row['time_window_end']) if row['time_window_end'] else None, "created_at": str(row['created_at']), "assigned_driver_id": row['assigned_driver_id'] }) logger.info(f"Retrieved {len(orders)} incomplete orders") return { "success": True, "orders": orders, "count": len(orders), "message": f"Found {len(orders)} incomplete order(s)" } except Exception as e: logger.error(f"Database error getting incomplete orders: {e}") return { "success": False, "error": f"Failed to get incomplete orders: {str(e)}" } def handle_count_drivers(tool_input: dict) -> dict: """ Execute count drivers tool Args: tool_input: Dict with optional filter fields Returns: Driver count result with breakdown """ # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "vehicle_type" in tool_input: where_clauses.append("vehicle_type = %s") params.append(tool_input["vehicle_type"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Total count query count_query = f"SELECT COUNT(*) as total FROM drivers{where_sql}" # Breakdown by status query status_query = f""" SELECT status, COUNT(*) as count FROM drivers{where_sql} GROUP BY status ORDER BY count DESC """ # Breakdown by vehicle type query vehicle_query = f""" SELECT vehicle_type, COUNT(*) as count FROM drivers{where_sql} GROUP BY vehicle_type ORDER BY count DESC """ try: # Execute queries total_result = execute_query(count_query, tuple(params) if params else None) total = total_result[0]['total'] if total_result else 0 status_result = execute_query(status_query, tuple(params) if params else None) vehicle_result = execute_query(vehicle_query, tuple(params) if params else None) # Format breakdown status_breakdown = {row['status']: row['count'] for row in status_result} if status_result else {} vehicle_breakdown = {row['vehicle_type']: row['count'] for row in vehicle_result if row['vehicle_type']} if vehicle_result else {} logger.info(f"Counted drivers: {total} total") return { "success": True, "total": total, "status_breakdown": status_breakdown, "vehicle_breakdown": vehicle_breakdown, "message": f"Found {total} driver(s)" } except Exception as e: logger.error(f"Database error counting drivers: {e}") return { "success": False, "error": f"Failed to count drivers: {str(e)}" } def handle_fetch_drivers(tool_input: dict) -> dict: """ Execute fetch drivers tool Args: tool_input: Dict with filter, pagination, and sorting options Returns: List of drivers matching criteria """ # Extract pagination and sorting limit = min(tool_input.get("limit", 10), 100) # Cap at 100 offset = tool_input.get("offset", 0) sort_by = tool_input.get("sort_by", "name") sort_order = tool_input.get("sort_order", "ASC") # Build WHERE clause based on filters where_clauses = [] params = [] if "status" in tool_input: where_clauses.append("status = %s") params.append(tool_input["status"]) if "vehicle_type" in tool_input: where_clauses.append("vehicle_type = %s") params.append(tool_input["vehicle_type"]) where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else "" # Build query query = f""" SELECT driver_id, name, phone, email, current_lat, current_lng, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, created_at, updated_at FROM drivers {where_sql} ORDER BY {sort_by} {sort_order} LIMIT %s OFFSET %s """ params.extend([limit, offset]) try: results = execute_query(query, tuple(params)) if not results: return { "success": True, "drivers": [], "count": 0, "message": "No drivers found matching criteria" } # Format drivers for readability drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] driver = { "driver_id": row['driver_id'], "name": row['name'], "contact": { "phone": row['phone'], "email": row['email'] }, "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills, "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } drivers.append(driver) logger.info(f"Fetched {len(drivers)} drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Retrieved {len(drivers)} driver(s)" } except Exception as e: logger.error(f"Database error fetching drivers: {e}") return { "success": False, "error": f"Failed to fetch drivers: {str(e)}" } def handle_get_driver_details(tool_input: dict) -> dict: """ Execute get driver details tool Args: tool_input: Dict with driver_id Returns: Complete driver details """ driver_id = tool_input.get("driver_id") if not driver_id: return { "success": False, "error": "driver_id is required" } query = """ SELECT driver_id, name, phone, email, current_lat, current_lng, last_location_update, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, created_at, updated_at FROM drivers WHERE driver_id = %s """ try: results = execute_query(query, (driver_id,)) if not results: return { "success": False, "error": f"Driver {driver_id} not found" } row = results[0] # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] # Reverse geocode location to get address location_address = None if row['current_lat'] and row['current_lng']: try: from chat.geocoding import GeocodingService geocoding_service = GeocodingService() reverse_result = geocoding_service.reverse_geocode( float(row['current_lat']), float(row['current_lng']) ) location_address = reverse_result.get('formatted_address', None) logger.info(f"Reverse geocoded driver location: {location_address}") except Exception as e: logger.warning(f"Failed to reverse geocode driver location: {e}") location_address = None driver = { "driver_id": row['driver_id'], "name": row['name'], "contact": { "phone": row['phone'], "email": row['email'] }, "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "address": location_address, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills, "timestamps": { "created_at": str(row['created_at']), "updated_at": str(row['updated_at']) if row['updated_at'] else None } } logger.info(f"Retrieved details for driver: {driver_id}") return { "success": True, "driver": driver, "message": f"Driver {driver_id} details retrieved" } except Exception as e: logger.error(f"Database error getting driver details: {e}") return { "success": False, "error": f"Failed to get driver details: {str(e)}" } def handle_search_drivers(tool_input: dict) -> dict: """ Execute search drivers tool Args: tool_input: Dict with search_term Returns: List of matching drivers """ search_term = tool_input.get("search_term", "").strip() if not search_term: return { "success": False, "error": "search_term is required" } query = """ SELECT driver_id, name, phone, email, vehicle_type, vehicle_plate, status, created_at FROM drivers WHERE driver_id ILIKE %s OR name ILIKE %s OR email ILIKE %s OR phone ILIKE %s OR vehicle_plate ILIKE %s ORDER BY name ASC LIMIT 50 """ search_pattern = f"%{search_term}%" params = (search_pattern, search_pattern, search_pattern, search_pattern, search_pattern) try: results = execute_query(query, params) if not results: return { "success": True, "drivers": [], "count": 0, "message": f"No drivers found matching '{search_term}'" } drivers = [] for row in results: drivers.append({ "driver_id": row['driver_id'], "name": row['name'], "phone": row['phone'], "email": row['email'], "vehicle_type": row['vehicle_type'], "vehicle_plate": row['vehicle_plate'], "status": row['status'], "created_at": str(row['created_at']) }) logger.info(f"Search '{search_term}' found {len(drivers)} drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Found {len(drivers)} driver(s) matching '{search_term}'" } except Exception as e: logger.error(f"Database error searching drivers: {e}") return { "success": False, "error": f"Failed to search drivers: {str(e)}" } def handle_get_available_drivers(tool_input: dict) -> dict: """ Execute get available drivers tool Args: tool_input: Dict with optional limit Returns: List of available drivers (active or offline) """ limit = min(tool_input.get("limit", 20), 100) query = """ SELECT driver_id, name, phone, vehicle_type, vehicle_plate, current_lat, current_lng, last_location_update, status, capacity_kg, capacity_m3, skills FROM drivers WHERE status IN ('active', 'offline') ORDER BY CASE status WHEN 'active' THEN 1 WHEN 'offline' THEN 2 END, name ASC LIMIT %s """ try: results = execute_query(query, (limit,)) if not results: return { "success": True, "drivers": [], "count": 0, "message": "No available drivers found" } drivers = [] for row in results: # Parse skills JSON if present skills = [] if row['skills']: try: import json skills = json.loads(row['skills']) if isinstance(row['skills'], str) else row['skills'] except: skills = [] drivers.append({ "driver_id": row['driver_id'], "name": row['name'], "phone": row['phone'], "location": { "latitude": float(row['current_lat']) if row['current_lat'] else None, "longitude": float(row['current_lng']) if row['current_lng'] else None, "last_update": str(row['last_location_update']) if row['last_location_update'] else None }, "status": row['status'], "vehicle": { "type": row['vehicle_type'], "plate": row['vehicle_plate'], "capacity_kg": float(row['capacity_kg']) if row['capacity_kg'] else None, "capacity_m3": float(row['capacity_m3']) if row['capacity_m3'] else None }, "skills": skills }) logger.info(f"Retrieved {len(drivers)} available drivers") return { "success": True, "drivers": drivers, "count": len(drivers), "message": f"Found {len(drivers)} available driver(s)" } except Exception as e: logger.error(f"Database error getting available drivers: {e}") return { "success": False, "error": f"Failed to get available drivers: {str(e)}" } # ============================================================================ # ASSIGNMENT MANAGEMENT TOOLS # ============================================================================ def handle_create_assignment(tool_input: dict) -> dict: """ Create assignment (assign order to driver) Validates order and driver status, calculates route, creates assignment record, and updates order/driver statuses. Args: tool_input: Dict with order_id and driver_id Returns: Assignment creation result with route data """ from datetime import datetime, timedelta order_id = (tool_input.get("order_id") or "").strip() driver_id = (tool_input.get("driver_id") or "").strip() if not order_id or not driver_id: return { "success": False, "error": "Both order_id and driver_id are required" } logger.info(f"Creating assignment: order={order_id}, driver={driver_id}") try: conn = get_db_connection() cursor = conn.cursor() # Step 1: Validate order exists and status is "pending" cursor.execute(""" SELECT status, delivery_lat, delivery_lng, delivery_address, assigned_driver_id FROM orders WHERE order_id = %s """, (order_id,)) order_row = cursor.fetchone() if not order_row: cursor.close() conn.close() return { "success": False, "error": f"Order not found: {order_id}" } order_status = order_row['status'] delivery_lat = order_row['delivery_lat'] delivery_lng = order_row['delivery_lng'] delivery_address = order_row['delivery_address'] current_driver = order_row['assigned_driver_id'] if order_status != "pending": cursor.close() conn.close() # Provide helpful error message based on current status if order_status == "assigned" and current_driver: # Get current driver name for better error message cursor2 = get_db_connection().cursor() cursor2.execute("SELECT name FROM drivers WHERE driver_id = %s", (current_driver,)) driver_row = cursor2.fetchone() driver_name = driver_row['name'] if driver_row else current_driver cursor2.close() return { "success": False, "error": f"Order {order_id} is already assigned to driver {driver_name}. Use 'unassign_order' first to reassign to a different driver." } else: return { "success": False, "error": f"Order must be in 'pending' status to be assigned. Current status: '{order_status}'" } if not delivery_lat or not delivery_lng: cursor.close() conn.close() return { "success": False, "error": "Order does not have delivery location coordinates" } # Step 2: Validate driver exists and status is "active" cursor.execute(""" SELECT status, current_lat, current_lng, vehicle_type, name FROM drivers WHERE driver_id = %s """, (driver_id,)) driver_row = cursor.fetchone() if not driver_row: cursor.close() conn.close() return { "success": False, "error": f"Driver not found: {driver_id}" } driver_status = driver_row['status'] driver_lat = driver_row['current_lat'] driver_lng = driver_row['current_lng'] vehicle_type = driver_row['vehicle_type'] driver_name = driver_row['name'] if driver_status not in ["active", "available"]: cursor.close() conn.close() return { "success": False, "error": f"Driver must be 'active' or 'available'. Current status: {driver_status}" } if not driver_lat or not driver_lng: cursor.close() conn.close() return { "success": False, "error": "Driver does not have current location" } # Step 3: Check if order already has active assignment cursor.execute(""" SELECT assignment_id, driver_id FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') """, (order_id,)) existing_assignment = cursor.fetchone() if existing_assignment: cursor.close() conn.close() existing_asn_id = existing_assignment['assignment_id'] existing_driver_id = existing_assignment['driver_id'] # Get driver name for better error message cursor2 = get_db_connection().cursor() cursor2.execute("SELECT name FROM drivers WHERE driver_id = %s", (existing_driver_id,)) driver_row = cursor2.fetchone() existing_driver_name = driver_row['name'] if driver_row else existing_driver_id cursor2.close() return { "success": False, "error": f"Order {order_id} is already assigned to driver {existing_driver_name} (Assignment: {existing_asn_id}). Use 'unassign_order' first to reassign." } # Step 4: Calculate route from driver location to delivery location logger.info(f"Calculating route: ({driver_lat},{driver_lng}) -> ({delivery_lat},{delivery_lng})") route_result = handle_calculate_route({ "origin": f"{driver_lat},{driver_lng}", "destination": f"{delivery_lat},{delivery_lng}", "vehicle_type": vehicle_type or "car", "alternatives": False, "include_steps": True # Get turn-by-turn directions }) if not route_result.get("success"): cursor.close() conn.close() return { "success": False, "error": f"Route calculation failed: {route_result.get('error', 'Unknown error')}" } # Step 5: Generate assignment ID timestamp = datetime.now().strftime("%Y%m%d%H%M%S%f") assignment_id = f"ASN-{timestamp}" # Step 6: Calculate estimated arrival duration_seconds = route_result.get("duration_in_traffic", {}).get("seconds", 0) estimated_arrival = datetime.now() + timedelta(seconds=duration_seconds) # Step 7: Create assignment record import json # Extract route directions (turn-by-turn steps) route_directions = route_result.get("steps", []) route_directions_json = json.dumps(route_directions) if route_directions else None cursor.execute(""" INSERT INTO assignments ( assignment_id, order_id, driver_id, route_distance_meters, route_duration_seconds, route_duration_in_traffic_seconds, route_summary, route_confidence, route_directions, driver_start_location_lat, driver_start_location_lng, delivery_location_lat, delivery_location_lng, delivery_address, estimated_arrival, vehicle_type, traffic_delay_seconds, status ) VALUES ( %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s ) """, ( assignment_id, order_id, driver_id, route_result.get("distance", {}).get("meters", 0), route_result.get("duration", {}).get("seconds", 0), route_result.get("duration_in_traffic", {}).get("seconds", 0), route_result.get("route_summary", ""), route_result.get("confidence", ""), route_directions_json, driver_lat, driver_lng, delivery_lat, delivery_lng, delivery_address, estimated_arrival, vehicle_type, route_result.get("traffic_delay", {}).get("seconds", 0), "active" )) # Step 8: Update order status and assigned driver cursor.execute(""" UPDATE orders SET status = 'assigned', assigned_driver_id = %s WHERE order_id = %s """, (driver_id, order_id)) # Step 9: Update driver status to busy cursor.execute(""" UPDATE drivers SET status = 'busy' WHERE driver_id = %s """, (driver_id,)) conn.commit() cursor.close() conn.close() logger.info(f"Assignment created successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id, "driver_name": driver_name, "route": { "distance": route_result.get("distance", {}).get("text", ""), "duration": route_result.get("duration", {}).get("text", ""), "duration_in_traffic": route_result.get("duration_in_traffic", {}).get("text", ""), "traffic_delay": route_result.get("traffic_delay", {}).get("text", ""), "summary": route_result.get("route_summary", ""), "directions": route_directions # Turn-by-turn navigation steps }, "estimated_arrival": estimated_arrival.isoformat(), "status": "active", "message": f"Order {order_id} assigned to driver {driver_name} ({driver_id})" } except Exception as e: logger.error(f"Failed to create assignment: {e}") return { "success": False, "error": f"Failed to create assignment: {str(e)}" } def handle_auto_assign_order(tool_input: dict) -> dict: """ Automatically assign order to nearest available driver (distance + validation based). Selection criteria: 1. Driver must be 'active' with valid location 2. Driver vehicle capacity must meet package weight/volume requirements 3. Driver must have required skills (fragile handling, cold storage, etc.) 4. Selects nearest driver by real-time route distance Args: tool_input: Dict with order_id Returns: Assignment details with selected driver info and distance """ order_id = (tool_input.get("order_id") or "").strip() if not order_id: return { "success": False, "error": "Missing required field: order_id" } try: conn = get_db_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # Step 1: Get order details with ALL requirements cursor.execute(""" SELECT order_id, customer_name, delivery_address, delivery_lat, delivery_lng, status, weight_kg, volume_m3, is_fragile, requires_cold_storage, requires_signature, priority, assigned_driver_id FROM orders WHERE order_id = %s """, (order_id,)) order = cursor.fetchone() if not order: cursor.close() conn.close() return { "success": False, "error": f"Order not found: {order_id}" } if order['status'] != 'pending': cursor.close() conn.close() return { "success": False, "error": f"Order must be 'pending' to auto-assign. Current status: {order['status']}" } if not order['delivery_lat'] or not order['delivery_lng']: cursor.close() conn.close() return { "success": False, "error": "Order missing delivery coordinates. Cannot calculate routes." } # Extract order requirements required_weight_kg = order['weight_kg'] or 0 required_volume_m3 = order['volume_m3'] or 0 needs_fragile_handling = order['is_fragile'] or False needs_cold_storage = order['requires_cold_storage'] or False # Step 2: Get all active drivers with valid locations cursor.execute(""" SELECT driver_id, name, phone, current_lat, current_lng, vehicle_type, capacity_kg, capacity_m3, skills FROM drivers WHERE status = 'active' AND current_lat IS NOT NULL AND current_lng IS NOT NULL """) active_drivers = cursor.fetchall() if not active_drivers: cursor.close() conn.close() return { "success": False, "error": "No active drivers available with valid location" } # Step 3: Filter and score each driver suitable_drivers = [] for driver in active_drivers: # Validate capacity (weight and volume) driver_capacity_kg = driver['capacity_kg'] or 0 driver_capacity_m3 = driver['capacity_m3'] or 0 if driver_capacity_kg < required_weight_kg: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Insufficient weight capacity: {driver_capacity_kg}kg < {required_weight_kg}kg") continue if driver_capacity_m3 < required_volume_m3: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Insufficient volume capacity: {driver_capacity_m3}m³ < {required_volume_m3}m³") continue # Validate skills driver_skills = driver['skills'] or [] if needs_fragile_handling and "fragile_handler" not in driver_skills: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Missing fragile_handler skill") continue if needs_cold_storage and "refrigerated" not in driver_skills: logger.info(f"Driver {driver['driver_id']} ({driver['name']}) - Missing refrigerated skill") continue # Step 4: Calculate real-time route distance route_result = handle_calculate_route({ "origin": f"{driver['current_lat']},{driver['current_lng']}", "destination": f"{order['delivery_lat']},{order['delivery_lng']}", "vehicle_type": driver['vehicle_type'], "include_steps": False # We don't need turn-by-turn for scoring }) if not route_result.get("success"): logger.warning(f"Driver {driver['driver_id']} ({driver['name']}) - Route calculation failed: {route_result.get('error')}") continue # Extract distance distance_meters = route_result.get('distance_meters', 999999) duration_seconds = route_result.get('duration_in_traffic_seconds', 0) suitable_drivers.append({ "driver": driver, "distance_meters": distance_meters, "distance_km": distance_meters / 1000, "duration_seconds": duration_seconds, "duration_minutes": duration_seconds / 60, "route_data": route_result }) if not suitable_drivers: cursor.close() conn.close() return { "success": False, "error": "No suitable drivers found. All active drivers failed capacity or skill requirements." } # Step 5: Sort by distance (nearest first) suitable_drivers.sort(key=lambda x: x['distance_meters']) # Step 6: Select nearest driver best_match = suitable_drivers[0] selected_driver = best_match['driver'] logger.info(f"Auto-assign: Selected driver {selected_driver['driver_id']} ({selected_driver['name']}) - {best_match['distance_km']:.2f}km away") cursor.close() conn.close() # Step 7: Create assignment using existing function assignment_result = handle_create_assignment({ "order_id": order_id, "driver_id": selected_driver['driver_id'] }) if not assignment_result.get("success"): return assignment_result # Step 8: Return enhanced response with selection info return { "success": True, "assignment_id": assignment_result['assignment_id'], "method": "auto_assignment", "order_id": order_id, "driver_id": selected_driver['driver_id'], "driver_name": selected_driver['name'], "driver_phone": selected_driver['phone'], "driver_vehicle_type": selected_driver['vehicle_type'], "selection_reason": "Nearest available driver meeting all requirements", "distance_km": round(best_match['distance_km'], 2), "distance_meters": best_match['distance_meters'], "estimated_duration_minutes": round(best_match['duration_minutes'], 1), "candidates_evaluated": len(active_drivers), "suitable_candidates": len(suitable_drivers), "route_summary": assignment_result.get('route_summary'), "estimated_arrival": assignment_result.get('estimated_arrival'), "assignment_details": assignment_result } except Exception as e: logger.error(f"Failed to auto-assign order: {e}") return { "success": False, "error": f"Failed to auto-assign order: {str(e)}" } def handle_intelligent_assign_order(tool_input: dict) -> dict: """ Intelligently assign order using Gemini AI to analyze all parameters. Uses Google's Gemini AI to evaluate: - Order characteristics (priority, weight, fragility, time constraints) - All available drivers (location, capacity, skills, vehicle type) - Real-time routing data (distance, traffic, weather) - Complex tradeoffs and optimal matching Returns assignment with AI reasoning explaining the selection. Args: tool_input: Dict with order_id Returns: Assignment details with AI reasoning and selected driver info """ import os import json import google.generativeai as genai from datetime import datetime order_id = (tool_input.get("order_id") or "").strip() if not order_id: return { "success": False, "error": "Missing required field: order_id" } # Check for Gemini API key gemini_api_key = os.getenv("GOOGLE_API_KEY") if not gemini_api_key: return { "success": False, "error": "GOOGLE_API_KEY environment variable not set. Required for intelligent assignment." } try: conn = get_db_connection() cursor = conn.cursor(cursor_factory=RealDictCursor) # Step 1: Get complete order details cursor.execute(""" SELECT order_id, customer_name, customer_phone, customer_email, delivery_address, delivery_lat, delivery_lng, pickup_address, pickup_lat, pickup_lng, time_window_start, time_window_end, expected_delivery_time, priority, weight_kg, volume_m3, order_value, is_fragile, requires_cold_storage, requires_signature, payment_status, special_instructions, status, created_at, sla_grace_period_minutes FROM orders WHERE order_id = %s """, (order_id,)) order = cursor.fetchone() if not order: cursor.close() conn.close() return { "success": False, "error": f"Order not found: {order_id}" } if order['status'] != 'pending': cursor.close() conn.close() return { "success": False, "error": f"Order must be 'pending' to assign. Current status: {order['status']}" } if not order['delivery_lat'] or not order['delivery_lng']: cursor.close() conn.close() return { "success": False, "error": "Order missing delivery coordinates. Cannot calculate routes." } # Step 2: Get all active drivers with complete details cursor.execute(""" SELECT driver_id, name, phone, email, current_lat, current_lng, last_location_update, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, skills, status, created_at, updated_at FROM drivers WHERE status = 'active' AND current_lat IS NOT NULL AND current_lng IS NOT NULL """) active_drivers = cursor.fetchall() if not active_drivers: cursor.close() conn.close() return { "success": False, "error": "No active drivers available with valid location" } # Step 3: Calculate routing data for each driver drivers_with_routes = [] for driver in active_drivers: # Calculate route with traffic route_result = handle_calculate_route({ "origin": f"{driver['current_lat']},{driver['current_lng']}", "destination": f"{order['delivery_lat']},{order['delivery_lng']}", "vehicle_type": driver['vehicle_type'], "include_steps": False }) # Get weather-aware routing if available try: intelligent_route = handle_calculate_intelligent_route({ "origin": f"{driver['current_lat']},{driver['current_lng']}", "destination": f"{order['delivery_lat']},{order['delivery_lng']}", "vehicle_type": driver['vehicle_type'] }) weather_data = intelligent_route.get('weather', {}) except: weather_data = {} if route_result.get("success"): drivers_with_routes.append({ "driver_id": driver['driver_id'], "name": driver['name'], "phone": driver['phone'], "vehicle_type": driver['vehicle_type'], "vehicle_plate": driver['vehicle_plate'], "capacity_kg": float(driver['capacity_kg']) if driver['capacity_kg'] else 0, "capacity_m3": float(driver['capacity_m3']) if driver['capacity_m3'] else 0, "skills": driver['skills'] or [], "current_location": { "lat": float(driver['current_lat']), "lng": float(driver['current_lng']) }, "route_to_delivery": { "distance_km": round(route_result.get('distance_meters', 0) / 1000, 2), "distance_meters": route_result.get('distance_meters', 0), "duration_minutes": round(route_result.get('duration_in_traffic_seconds', 0) / 60, 1), "traffic_delay_seconds": route_result.get('traffic_delay_seconds', 0), "route_summary": route_result.get('route_summary', ''), "has_tolls": route_result.get('has_tolls', False) }, "weather_conditions": weather_data }) if not drivers_with_routes: cursor.close() conn.close() return { "success": False, "error": "Unable to calculate routes for any active drivers" } cursor.close() conn.close() # Step 4: Build comprehensive context for Gemini order_context = { "order_id": order['order_id'], "customer": { "name": order['customer_name'], "phone": order['customer_phone'] }, "delivery": { "address": order['delivery_address'], "coordinates": {"lat": float(order['delivery_lat']), "lng": float(order['delivery_lng'])} }, "time_constraints": { "expected_delivery_time": str(order['expected_delivery_time']) if order['expected_delivery_time'] else None, "time_window_start": str(order['time_window_start']) if order['time_window_start'] else None, "time_window_end": str(order['time_window_end']) if order['time_window_end'] else None, "sla_grace_period_minutes": order['sla_grace_period_minutes'], "created_at": str(order['created_at']) }, "package": { "weight_kg": float(order['weight_kg']) if order['weight_kg'] else 0, "volume_m3": float(order['volume_m3']) if order['volume_m3'] else 0, "value": float(order['order_value']) if order['order_value'] else 0, "is_fragile": order['is_fragile'] or False, "requires_cold_storage": order['requires_cold_storage'] or False, "requires_signature": order['requires_signature'] or False }, "priority": order['priority'], "payment_status": order['payment_status'], "special_instructions": order['special_instructions'] } # Step 5: Call Gemini AI for intelligent decision genai.configure(api_key=gemini_api_key) model = genai.GenerativeModel('gemini-2.0-flash-exp') prompt = f"""You are an intelligent fleet management AI. Analyze the following delivery order and available drivers to select the BEST driver for this assignment. **ORDER DETAILS:** {json.dumps(order_context, indent=2)} **AVAILABLE DRIVERS ({len(drivers_with_routes)}):** {json.dumps(drivers_with_routes, indent=2)} **CURRENT TIME:** {datetime.now().isoformat()} **YOUR TASK:** Analyze ALL parameters comprehensively: 1. **Distance & Route Efficiency**: Consider route distance, traffic delays, tolls 2. **Vehicle Matching**: Match vehicle type and capacity to package requirements 3. **Skills Requirements**: Ensure driver has necessary skills (fragile handling, cold storage) 4. **Time Constraints**: Evaluate ability to meet expected delivery time 5. **Priority Level**: Factor in order priority (urgent > express > standard) 6. **Weather Conditions**: Consider weather impact on delivery safety and speed 7. **Special Requirements**: Account for signature requirements, special instructions 8. **Cost Efficiency**: Consider fuel costs, toll roads, driver utilization **RESPONSE FORMAT (JSON only, no markdown):** {{ "selected_driver_id": "DRV-XXXXXXXXX", "confidence_score": 0.95, "reasoning": {{ "primary_factors": ["Nearest driver (5.2km)", "Has fragile_handler skill", "Sufficient capacity"], "trade_offs_considered": ["Driver A was 1km closer but lacked required skills", "Driver B had larger capacity but 15min further"], "risk_assessment": "Low risk - clear weather, light traffic, experienced driver", "decision_summary": "Selected Driver X because they offer the best balance of proximity (5.2km), required skills (fragile_handler), and adequate capacity (10kg) for this urgent fragile delivery." }}, "alternatives": [ {{"driver_id": "DRV-YYY", "reason_not_selected": "Missing fragile_handler skill"}}, {{"driver_id": "DRV-ZZZ", "reason_not_selected": "15 minutes further away"}} ] }} **IMPORTANT:** Return ONLY valid JSON. Do not include markdown formatting, code blocks, or explanatory text outside the JSON.""" response = model.generate_content(prompt) response_text = response.text.strip() # Clean response (remove markdown code blocks if present) if response_text.startswith("```json"): response_text = response_text[7:] if response_text.startswith("```"): response_text = response_text[3:] if response_text.endswith("```"): response_text = response_text[:-3] response_text = response_text.strip() # Parse Gemini response try: ai_decision = json.loads(response_text) except json.JSONDecodeError as e: logger.error(f"Failed to parse Gemini response: {e}") logger.error(f"Response text: {response_text}") return { "success": False, "error": f"Failed to parse AI response. Invalid JSON returned by Gemini: {str(e)}" } selected_driver_id = ai_decision.get("selected_driver_id") if not selected_driver_id: return { "success": False, "error": "AI did not select a driver" } # Validate selected driver is still available selected_driver = next((d for d in drivers_with_routes if d["driver_id"] == selected_driver_id), None) if not selected_driver: return { "success": False, "error": f"AI selected driver {selected_driver_id} but driver not found in available list" } # Step 6: Create assignment using existing function logger.info(f"Intelligent-assign: AI selected driver {selected_driver_id} ({selected_driver['name']})") assignment_result = handle_create_assignment({ "order_id": order_id, "driver_id": selected_driver_id }) if not assignment_result.get("success"): return assignment_result # Step 7: Return enhanced response with AI reasoning return { "success": True, "assignment_id": assignment_result['assignment_id'], "method": "intelligent_assignment", "ai_provider": "Google Gemini 2.0 Flash", "ai_model": "gemini-2.0-flash-exp", "order_id": order_id, "driver_id": selected_driver_id, "driver_name": selected_driver['name'], "driver_phone": selected_driver['phone'], "driver_vehicle_type": selected_driver['vehicle_type'], "distance_km": selected_driver['route_to_delivery']['distance_km'], "estimated_duration_minutes": selected_driver['route_to_delivery']['duration_minutes'], "ai_reasoning": ai_decision.get('reasoning', {}), "confidence_score": ai_decision.get('confidence_score', 0), "alternatives_considered": ai_decision.get('alternatives', []), "candidates_evaluated": len(drivers_with_routes), "route_summary": assignment_result.get('route_summary'), "estimated_arrival": assignment_result.get('estimated_arrival'), "assignment_details": assignment_result } except Exception as e: logger.error(f"Failed to intelligently assign order: {e}") return { "success": False, "error": f"Failed to intelligently assign order: {str(e)}" } def handle_get_assignment_details(tool_input: dict) -> dict: """ Get assignment details Can query by assignment_id, order_id, or driver_id. Returns assignment with route data and related order/driver info. Args: tool_input: Dict with assignment_id, order_id, or driver_id Returns: Assignment details or list of assignments """ assignment_id = (tool_input.get("assignment_id") or "").strip() order_id = (tool_input.get("order_id") or "").strip() driver_id = (tool_input.get("driver_id") or "").strip() if not assignment_id and not order_id and not driver_id: return { "success": False, "error": "Provide at least one of: assignment_id, order_id, or driver_id" } try: conn = get_db_connection() cursor = conn.cursor() # Build query based on provided parameters query = """ SELECT a.assignment_id, a.order_id, a.driver_id, a.status, a.assigned_at, a.updated_at, a.estimated_arrival, a.actual_arrival, a.route_distance_meters, a.route_duration_seconds, a.route_duration_in_traffic_seconds, a.route_summary, a.route_confidence, a.traffic_delay_seconds, a.route_directions, a.driver_start_location_lat, a.driver_start_location_lng, a.delivery_location_lat, a.delivery_location_lng, a.delivery_address, a.vehicle_type, a.sequence_number, a.notes, a.failure_reason, o.customer_name, o.status as order_status, d.name as driver_name, d.status as driver_status, d.phone as driver_phone FROM assignments a LEFT JOIN orders o ON a.order_id = o.order_id LEFT JOIN drivers d ON a.driver_id = d.driver_id WHERE 1=1 """ params = [] if assignment_id: query += " AND a.assignment_id = %s" params.append(assignment_id) if order_id: query += " AND a.order_id = %s" params.append(order_id) if driver_id: query += " AND a.driver_id = %s" params.append(driver_id) query += " ORDER BY a.assigned_at DESC" cursor.execute(query, params) rows = cursor.fetchall() cursor.close() conn.close() if not rows: return { "success": False, "error": "No assignments found matching criteria" } # Format results assignments = [] for row in rows: assignment = { "assignment_id": row['assignment_id'], "order_id": row['order_id'], "driver_id": row['driver_id'], "status": row['status'], "assigned_at": row['assigned_at'].isoformat() if row['assigned_at'] else None, "updated_at": row['updated_at'].isoformat() if row['updated_at'] else None, "estimated_arrival": row['estimated_arrival'].isoformat() if row['estimated_arrival'] else None, "actual_arrival": row['actual_arrival'].isoformat() if row['actual_arrival'] else None, "route": { "distance_meters": row['route_distance_meters'], "distance_km": round(row['route_distance_meters'] / 1000, 2) if row['route_distance_meters'] else 0, "duration_seconds": row['route_duration_seconds'], "duration_minutes": round(row['route_duration_seconds'] / 60, 1) if row['route_duration_seconds'] else 0, "duration_in_traffic_seconds": row['route_duration_in_traffic_seconds'], "duration_in_traffic_minutes": round(row['route_duration_in_traffic_seconds'] / 60, 1) if row['route_duration_in_traffic_seconds'] else 0, "summary": row['route_summary'], "confidence": row['route_confidence'], "traffic_delay_seconds": row['traffic_delay_seconds'], "traffic_delay_minutes": round(row['traffic_delay_seconds'] / 60, 1) if row['traffic_delay_seconds'] else 0, "directions": row['route_directions'] # Turn-by-turn navigation steps }, "driver_start_location": { "lat": float(row['driver_start_location_lat']) if row['driver_start_location_lat'] else None, "lng": float(row['driver_start_location_lng']) if row['driver_start_location_lng'] else None }, "delivery_location": { "lat": float(row['delivery_location_lat']) if row['delivery_location_lat'] else None, "lng": float(row['delivery_location_lng']) if row['delivery_location_lng'] else None, "address": row['delivery_address'] }, "vehicle_type": row['vehicle_type'], "sequence_number": row['sequence_number'], "notes": row['notes'], "failure_reason": row['failure_reason'], "order": { "customer_name": row['customer_name'], "status": row['order_status'] }, "driver": { "name": row['driver_name'], "status": row['driver_status'], "phone": row['driver_phone'] } } assignments.append(assignment) if assignment_id and len(assignments) == 1: # Single assignment query return { "success": True, "assignment": assignments[0] } else: # Multiple assignments return { "success": True, "count": len(assignments), "assignments": assignments } except Exception as e: logger.error(f"Failed to get assignment details: {e}") return { "success": False, "error": f"Failed to get assignment details: {str(e)}" } def handle_update_assignment(tool_input: dict) -> dict: """ Update assignment status Allows updating assignment status and actual metrics. Manages cascading updates to order and driver statuses. Args: tool_input: Dict with assignment_id, status (optional), actual_arrival (optional), notes (optional) Returns: Update result """ from datetime import datetime assignment_id = (tool_input.get("assignment_id") or "").strip() new_status = (tool_input.get("status") or "").strip().lower() actual_arrival = tool_input.get("actual_arrival") notes = (tool_input.get("notes") or "").strip() if not assignment_id: return { "success": False, "error": "assignment_id is required" } if not new_status and not actual_arrival and not notes: return { "success": False, "error": "Provide at least one field to update: status, actual_arrival, or notes" } # Validate status if provided valid_statuses = ["active", "in_progress", "completed", "failed", "cancelled"] if new_status and new_status not in valid_statuses: return { "success": False, "error": f"Invalid status. Must be one of: {', '.join(valid_statuses)}" } logger.info(f"Updating assignment: {assignment_id}, status={new_status}") try: conn = get_db_connection() cursor = conn.cursor() # Get current assignment details cursor.execute(""" SELECT status, order_id, driver_id FROM assignments WHERE assignment_id = %s """, (assignment_id,)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": f"Assignment not found: {assignment_id}" } current_status = assignment_row['status'] order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] # Validate status transitions if new_status: # Cannot go backwards if current_status == "completed" and new_status in ["active", "in_progress"]: cursor.close() conn.close() return { "success": False, "error": "Cannot change status from 'completed' back to 'active' or 'in_progress'" } if current_status == "failed" and new_status != "failed": cursor.close() conn.close() return { "success": False, "error": "Cannot change status from 'failed'" } if current_status == "cancelled" and new_status != "cancelled": cursor.close() conn.close() return { "success": False, "error": "Cannot change status from 'cancelled'" } # Build update query updates = [] params = [] if new_status: updates.append("status = %s") params.append(new_status) if actual_arrival: updates.append("actual_arrival = %s") params.append(actual_arrival) if notes: updates.append("notes = %s") params.append(notes) params.append(assignment_id) # Update assignment cursor.execute(f""" UPDATE assignments SET {', '.join(updates)} WHERE assignment_id = %s """, params) # Handle cascading updates based on new status if new_status: if new_status in ["completed", "failed", "cancelled"]: # Update order status if new_status == "completed": cursor.execute(""" UPDATE orders SET status = 'delivered' WHERE order_id = %s """, (order_id,)) elif new_status == "failed": cursor.execute(""" UPDATE orders SET status = 'failed' WHERE order_id = %s """, (order_id,)) elif new_status == "cancelled": cursor.execute(""" UPDATE orders SET status = 'cancelled', assigned_driver_id = NULL WHERE order_id = %s """, (order_id,)) # Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) other_assignments_count = cursor.fetchone()['count'] # If no other active assignments, set driver back to active if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active' WHERE driver_id = %s """, (driver_id,)) conn.commit() cursor.close() conn.close() logger.info(f"Assignment updated successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "updated_fields": { "status": new_status if new_status else current_status, "actual_arrival": actual_arrival if actual_arrival else "not updated", "notes": notes if notes else "not updated" }, "message": f"Assignment {assignment_id} updated successfully" } except Exception as e: logger.error(f"Failed to update assignment: {e}") return { "success": False, "error": f"Failed to update assignment: {str(e)}" } def handle_unassign_order(tool_input: dict) -> dict: """ Unassign order (delete assignment) Removes assignment and reverts order/driver to original states. Args: tool_input: Dict with order_id or assignment_id, and confirm flag Returns: Unassignment result """ order_id = (tool_input.get("order_id") or "").strip() assignment_id = (tool_input.get("assignment_id") or "").strip() confirm = tool_input.get("confirm", False) if not order_id and not assignment_id: return { "success": False, "error": "Provide either order_id or assignment_id" } if not confirm: return { "success": False, "error": "Unassignment requires confirm=true for safety" } logger.info(f"Unassigning: order_id={order_id}, assignment_id={assignment_id}") try: conn = get_db_connection() cursor = conn.cursor() # Find assignment if assignment_id: cursor.execute(""" SELECT order_id, driver_id, status FROM assignments WHERE assignment_id = %s """, (assignment_id,)) else: cursor.execute(""" SELECT assignment_id, driver_id, status FROM assignments WHERE order_id = %s AND status IN ('active', 'in_progress') ORDER BY assigned_at DESC LIMIT 1 """, (order_id,)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": "No active assignment found" } if assignment_id: found_order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] status = assignment_row['status'] else: assignment_id = assignment_row['assignment_id'] driver_id = assignment_row['driver_id'] status = assignment_row['status'] found_order_id = order_id # Validate status (cannot unassign if in_progress without force) if status == "in_progress": cursor.close() conn.close() return { "success": False, "error": "Cannot unassign order with 'in_progress' status. Complete or fail the delivery first." } # Delete assignment cursor.execute(""" DELETE FROM assignments WHERE assignment_id = %s """, (assignment_id,)) # Revert order status to pending and clear assigned driver cursor.execute(""" UPDATE orders SET status = 'pending', assigned_driver_id = NULL WHERE order_id = %s """, (found_order_id,)) # Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') """, (driver_id,)) other_assignments_count = cursor.fetchone()[0] # If no other active assignments, set driver back to active if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active' WHERE driver_id = %s """, (driver_id,)) conn.commit() cursor.close() conn.close() logger.info(f"Assignment removed successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "order_id": found_order_id, "driver_id": driver_id, "message": f"Order {found_order_id} unassigned from driver {driver_id}. Order status reverted to 'pending'." } except Exception as e: logger.error(f"Failed to unassign order: {e}") return { "success": False, "error": f"Failed to unassign order: {str(e)}" } def handle_complete_delivery(tool_input: dict) -> dict: """ Complete a delivery and automatically update driver location Marks delivery as completed, updates order/driver statuses, and moves driver location to the delivery address. Args: tool_input: Dict with assignment_id, confirm flag, and optional fields Returns: Completion result """ from datetime import datetime assignment_id = (tool_input.get("assignment_id") or "").strip() confirm = tool_input.get("confirm", False) actual_distance_meters = tool_input.get("actual_distance_meters") notes = (tool_input.get("notes") or "").strip() if not assignment_id: return { "success": False, "error": "assignment_id is required" } if not confirm: return { "success": False, "error": "Delivery completion requires confirm=true for safety" } logger.info(f"Completing delivery: assignment_id={assignment_id}") try: conn = get_db_connection() cursor = conn.cursor() # Get assignment and order details including timing fields cursor.execute(""" SELECT a.status, a.order_id, a.driver_id, a.delivery_location_lat, a.delivery_location_lng, a.delivery_address, o.customer_name, o.expected_delivery_time, o.sla_grace_period_minutes, d.name as driver_name FROM assignments a JOIN orders o ON a.order_id = o.order_id JOIN drivers d ON a.driver_id = d.driver_id WHERE a.assignment_id = %s """, (assignment_id,)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": f"Assignment not found: {assignment_id}" } status = assignment_row['status'] order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] delivery_lat = assignment_row['delivery_location_lat'] delivery_lng = assignment_row['delivery_location_lng'] delivery_address = assignment_row['delivery_address'] customer_name = assignment_row['customer_name'] driver_name = assignment_row['driver_name'] expected_delivery_time = assignment_row['expected_delivery_time'] sla_grace_period_minutes = assignment_row['sla_grace_period_minutes'] or 15 # Validate status if status not in ["active", "in_progress"]: cursor.close() conn.close() return { "success": False, "error": f"Cannot complete delivery: assignment status is '{status}'. Must be 'active' or 'in_progress'." } # Validate delivery location exists if not delivery_lat or not delivery_lng: cursor.close() conn.close() return { "success": False, "error": "Cannot complete delivery: delivery location coordinates are missing" } # Current timestamp for completion completion_time = datetime.now() # Step 1: Update assignment to completed update_fields = ["status = %s", "actual_arrival = %s", "updated_at = %s"] params = ["completed", completion_time, completion_time] if actual_distance_meters: update_fields.append("actual_distance_meters = %s") params.append(actual_distance_meters) if notes: update_fields.append("notes = %s") params.append(notes) params.append(assignment_id) cursor.execute(f""" UPDATE assignments SET {', '.join(update_fields)} WHERE assignment_id = %s """, tuple(params)) # Step 2: Update driver location to delivery address cursor.execute(""" UPDATE drivers SET current_lat = %s, current_lng = %s, last_location_update = %s, updated_at = %s WHERE driver_id = %s """, (delivery_lat, delivery_lng, completion_time, completion_time, driver_id)) logger.info(f"Driver {driver_id} location updated to delivery address: ({delivery_lat}, {delivery_lng})") # Step 3: Calculate delivery performance status delivery_status = "on_time" # Default timing_info = { "expected_delivery_time": expected_delivery_time.isoformat() if expected_delivery_time else None, "actual_delivery_time": completion_time.isoformat(), "sla_grace_period_minutes": sla_grace_period_minutes } if expected_delivery_time: # Calculate grace period deadline from datetime import timedelta grace_deadline = expected_delivery_time + timedelta(minutes=sla_grace_period_minutes) if completion_time <= expected_delivery_time: delivery_status = "on_time" timing_info["status"] = "On-time delivery" timing_info["delay_minutes"] = 0 elif completion_time <= grace_deadline: delivery_status = "late" delay_minutes = int((completion_time - expected_delivery_time).total_seconds() / 60) timing_info["status"] = f"Late (within grace period)" timing_info["delay_minutes"] = delay_minutes else: delivery_status = "very_late" delay_minutes = int((completion_time - expected_delivery_time).total_seconds() / 60) timing_info["status"] = f"Very late (SLA violation)" timing_info["delay_minutes"] = delay_minutes # Step 4: Update order status to delivered with timing info cursor.execute(""" UPDATE orders SET status = 'delivered', delivered_at = %s, delivery_status = %s, updated_at = %s WHERE order_id = %s """, (completion_time, delivery_status, completion_time, order_id)) logger.info(f"Order {order_id} marked as delivered with status '{delivery_status}'") # Step 4: Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) other_assignments_count = cursor.fetchone()['count'] # Step 5: If no other active assignments, set driver to active cascading_actions = [] if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (completion_time, driver_id)) cascading_actions.append(f"Driver {driver_name} set to 'active' (no other assignments)") else: cascading_actions.append(f"Driver {driver_name} remains 'busy' ({other_assignments_count} other active assignment(s))") conn.commit() cursor.close() conn.close() logger.info(f"Delivery completed successfully: {assignment_id}") return { "success": True, "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id, "customer_name": customer_name, "driver_name": driver_name, "completed_at": completion_time.isoformat(), "delivery_status": delivery_status, "timing": timing_info, "delivery_location": { "lat": float(delivery_lat), "lng": float(delivery_lng), "address": delivery_address }, "driver_updated": { "new_location": f"{delivery_lat}, {delivery_lng}", "location_updated_at": completion_time.isoformat() }, "cascading_actions": cascading_actions, "message": f"Delivery completed! Order {order_id} delivered by {driver_name}. Status: {timing_info.get('status', delivery_status)}. Driver location updated to delivery address." } except Exception as e: logger.error(f"Failed to complete delivery: {e}") return { "success": False, "error": f"Failed to complete delivery: {str(e)}" } def handle_fail_delivery(tool_input: dict) -> dict: """ Mark delivery as failed with mandatory location and reason Driver must provide current GPS location and failure reason. Updates driver location to reported coordinates and sets statuses accordingly. Args: tool_input: Dict with assignment_id, current_lat, current_lng, failure_reason, confirm flag, and optional notes Returns: Failure recording result """ from datetime import datetime assignment_id = (tool_input.get("assignment_id") or "").strip() current_lat = tool_input.get("current_lat") current_lng = tool_input.get("current_lng") failure_reason = (tool_input.get("failure_reason") or "").strip() confirm = tool_input.get("confirm", False) notes = (tool_input.get("notes") or "").strip() # Validation if not assignment_id: return { "success": False, "error": "assignment_id is required" } if not confirm: return { "success": False, "error": "Delivery failure requires confirm=true for safety" } if current_lat is None or current_lng is None: return { "success": False, "error": "Driver must provide current location (current_lat and current_lng required)" } if not failure_reason: return { "success": False, "error": "Failure reason is required. Valid reasons: customer_not_available, wrong_address, refused_delivery, damaged_goods, payment_issue, vehicle_breakdown, access_restricted, weather_conditions, other" } # Validate failure_reason is one of the allowed values valid_reasons = [ "customer_not_available", "wrong_address", "refused_delivery", "damaged_goods", "payment_issue", "vehicle_breakdown", "access_restricted", "weather_conditions", "other" ] if failure_reason not in valid_reasons: return { "success": False, "error": f"Invalid failure_reason '{failure_reason}'. Must be one of: {', '.join(valid_reasons)}" } # Validate coordinates are valid try: current_lat = float(current_lat) current_lng = float(current_lng) if not (-90 <= current_lat <= 90) or not (-180 <= current_lng <= 180): return { "success": False, "error": "Invalid GPS coordinates. Latitude must be -90 to 90, longitude must be -180 to 180" } except (ValueError, TypeError): return { "success": False, "error": "current_lat and current_lng must be valid numbers" } logger.info(f"Failing delivery: assignment_id={assignment_id}, reason={failure_reason}") try: conn = get_db_connection() cursor = conn.cursor() # Get assignment and order details including timing fields cursor.execute(""" SELECT a.status, a.order_id, a.driver_id, a.delivery_address, o.customer_name, o.expected_delivery_time, o.sla_grace_period_minutes, d.name as driver_name FROM assignments a JOIN orders o ON a.order_id = o.order_id JOIN drivers d ON a.driver_id = d.driver_id WHERE a.assignment_id = %s """, (assignment_id,)) assignment_row = cursor.fetchone() if not assignment_row: cursor.close() conn.close() return { "success": False, "error": f"Assignment not found: {assignment_id}" } status = assignment_row['status'] order_id = assignment_row['order_id'] driver_id = assignment_row['driver_id'] delivery_address = assignment_row['delivery_address'] customer_name = assignment_row['customer_name'] driver_name = assignment_row['driver_name'] expected_delivery_time = assignment_row['expected_delivery_time'] sla_grace_period_minutes = assignment_row['sla_grace_period_minutes'] or 15 # Validate status if status not in ["active", "in_progress"]: cursor.close() conn.close() return { "success": False, "error": f"Cannot fail delivery: assignment status is '{status}'. Must be 'active' or 'in_progress'." } # Current timestamp for failure failure_time = datetime.now() # Step 1: Update assignment to failed update_fields = [ "status = %s", "failure_reason = %s", "actual_arrival = %s", "updated_at = %s" ] params = ["failed", failure_reason, failure_time, failure_time] if notes: update_fields.append("notes = %s") params.append(notes) params.append(assignment_id) cursor.execute(f""" UPDATE assignments SET {', '.join(update_fields)} WHERE assignment_id = %s """, tuple(params)) # Step 2: Update driver location to reported current location cursor.execute(""" UPDATE drivers SET current_lat = %s, current_lng = %s, last_location_update = %s, updated_at = %s WHERE driver_id = %s """, (current_lat, current_lng, failure_time, failure_time, driver_id)) logger.info(f"Driver {driver_id} location updated to reported position: ({current_lat}, {current_lng})") # Step 3: Calculate delivery performance status for failure delivery_status = "failed_on_time" # Default - failed but before deadline timing_info = { "expected_delivery_time": expected_delivery_time.isoformat() if expected_delivery_time else None, "failure_time": failure_time.isoformat(), "sla_grace_period_minutes": sla_grace_period_minutes } if expected_delivery_time: if failure_time <= expected_delivery_time: delivery_status = "failed_on_time" timing_info["status"] = "Failed before deadline (attempted delivery on time)" else: delivery_status = "failed_late" delay_minutes = int((failure_time - expected_delivery_time).total_seconds() / 60) timing_info["status"] = f"Failed after deadline (late attempt)" timing_info["delay_minutes"] = delay_minutes # Step 4: Update order status to failed with timing info cursor.execute(""" UPDATE orders SET status = 'failed', delivered_at = %s, delivery_status = %s, updated_at = %s WHERE order_id = %s """, (failure_time, delivery_status, failure_time, order_id)) logger.info(f"Order {order_id} marked as failed with status '{delivery_status}'") # Step 4: Check if driver has other active assignments cursor.execute(""" SELECT COUNT(*) as count FROM assignments WHERE driver_id = %s AND status IN ('active', 'in_progress') AND assignment_id != %s """, (driver_id, assignment_id)) other_assignments_count = cursor.fetchone()['count'] # Step 5: If no other active assignments, set driver to active cascading_actions = [] if other_assignments_count == 0: cursor.execute(""" UPDATE drivers SET status = 'active', updated_at = %s WHERE driver_id = %s """, (failure_time, driver_id)) cascading_actions.append(f"Driver {driver_name} set to 'active' (no other assignments)") else: cascading_actions.append(f"Driver {driver_name} remains 'busy' ({other_assignments_count} other active assignment(s))") conn.commit() cursor.close() conn.close() logger.info(f"Delivery marked as failed: {assignment_id}") # Format failure reason for display reason_display = failure_reason.replace("_", " ").title() return { "success": True, "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id, "customer_name": customer_name, "driver_name": driver_name, "failed_at": failure_time.isoformat(), "failure_reason": failure_reason, "failure_reason_display": reason_display, "delivery_status": delivery_status, "timing": timing_info, "delivery_address": delivery_address, "driver_location": { "lat": current_lat, "lng": current_lng, "updated_at": failure_time.isoformat() }, "cascading_actions": cascading_actions, "message": f"Delivery failed for order {order_id}. Reason: {reason_display}. Timing: {timing_info.get('status', delivery_status)}. Driver {driver_name} location updated to ({current_lat}, {current_lng})." } except Exception as e: logger.error(f"Failed to record delivery failure: {e}") return { "success": False, "error": f"Failed to record delivery failure: {str(e)}" } def get_tools_list() -> list: """Get list of available tools""" return [tool["name"] for tool in TOOLS_SCHEMA] def get_tool_description(tool_name: str) -> str: """Get description for a specific tool""" for tool in TOOLS_SCHEMA: if tool["name"] == tool_name: return tool["description"] return ""