""" FleetMind Dispatch Coordinator - MCP Server Industry-standard Model Context Protocol server for delivery dispatch management Provides 18 AI tools for order and driver management via standardized MCP protocol. Compatible with Claude Desktop, Continue, Cline, and all MCP clients. """ import os import sys import json import logging from pathlib import Path from typing import Literal from datetime import datetime # Add project root to path sys.path.insert(0, str(Path(__file__).parent)) from fastmcp import FastMCP from fastmcp.server.auth import RemoteAuthProvider, TokenVerifier, AccessToken from pydantic import AnyHttpUrl # Import existing services (unchanged) from chat.geocoding import GeocodingService from database.connection import execute_query, execute_write, test_connection # Import authentication module from database.user_context import verify_token, check_permission, get_required_scope # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/fleetmind_mcp.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # ============================================================================ # OAUTH AUTHENTICATION SETUP # ============================================================================ class StytchTokenVerifier(TokenVerifier): """Token verifier for Stytch session tokens""" def __init__(self): super().__init__( base_url=os.getenv('SERVER_URL', 'http://localhost:7860'), required_scopes=None # Scope checking handled per-tool ) async def verify_token(self, token: str) -> AccessToken | None: """Verify Stytch session token and return AccessToken""" try: # Use existing Stytch verification function user_info = verify_token(token) if not user_info: logger.debug(f"Token verification failed") return None # Convert to FastMCP AccessToken format access_token = AccessToken( token=token, client_id=user_info['user_id'], scopes=user_info.get('scopes', []), resource_owner=user_info.get('email'), claims={ 'user_id': user_info['user_id'], 'email': user_info['email'], 'name': user_info.get('name', 'Unknown User'), } ) logger.info(f"Token verified for user: {user_info['email']} (user_id: {user_info['user_id']})") return access_token except Exception as e: logger.error(f"Token verification error: {e}") return None # Create OAuth authentication provider (but don't apply globally yet) auth_provider = RemoteAuthProvider( token_verifier=StytchTokenVerifier(), authorization_servers=[AnyHttpUrl('https://test.stytch.com/v1/public')], base_url=os.getenv('SERVER_URL', 'http://localhost:7860'), resource_name="FleetMind Dispatch Coordinator" ) logger.info("OAuth authentication provider configured with Stytch") # ============================================================================ # MCP SERVER INITIALIZATION # ============================================================================ # NOTE: Not using auth=auth_provider here because it would block SSE connections # Instead, we manually verify tokens in each tool using get_authenticated_user() mcp = FastMCP( name="FleetMind Dispatch Coordinator", version="1.0.0" ) # Initialize shared services logger.info("Initializing FleetMind MCP Server...") geocoding_service = GeocodingService() logger.info(f"Geocoding Service: {geocoding_service.get_status()}") # Test database connection try: test_connection() logger.info("Database: Connected to PostgreSQL") except Exception as e: logger.error(f"Database: Connection failed - {e}") # ============================================================================ # AUTHENTICATION # ============================================================================ # Note: OAuth metadata endpoint will be added via app.py instead of here # FastMCP doesn't expose direct app access for custom routes def get_authenticated_user(): """ Extract and verify user from authentication token via FastMCP Returns: User info dict with user_id, email, scopes, name or None if not authenticated """ try: # Development bypass mode if os.getenv("SKIP_AUTH") == "true": logger.debug("SKIP_AUTH enabled - using development user") return { 'user_id': 'dev-user', 'email': 'dev@fleetmind.local', 'scopes': ['admin'], 'name': 'Development User' } # Extract token from FastMCP request context from fastmcp.server.dependencies import get_access_token access_token = get_access_token() if not access_token: logger.debug("No access token found in request context") return None # Token already verified by FastMCP auth middleware # Extract user info from AccessToken claims user_info = { 'user_id': access_token.claims.get('user_id') or access_token.client_id, 'email': access_token.resource_owner or access_token.claims.get('email', 'unknown'), 'scopes': access_token.scopes or [], 'name': access_token.claims.get('name', 'Unknown User') } logger.info(f"Authenticated user: {user_info['email']} (user_id: {user_info['user_id']})") return user_info except RuntimeError as e: # No HTTP request context available (likely stdio mode or testing) logger.debug(f"No request context available: {e}") return None except Exception as e: logger.error(f"Authentication error: {e}") return None # ============================================================================ # MCP RESOURCES # ============================================================================ @mcp.resource("orders://all") def get_orders_resource() -> str: """ Real-time orders dataset for AI context. Returns all orders from the last 30 days. Returns: JSON string containing orders array with key fields: - order_id, customer_name, delivery_address - status, priority, created_at, assigned_driver_id """ try: query = """ SELECT order_id, customer_name, delivery_address, status, priority, created_at, assigned_driver_id FROM orders WHERE created_at > NOW() - INTERVAL '30 days' ORDER BY created_at DESC LIMIT 1000 """ orders = execute_query(query) logger.info(f"Resource orders://all - Retrieved {len(orders) if orders else 0} orders") return json.dumps(orders, default=str, indent=2) except Exception as e: logger.error(f"Resource orders://all failed: {e}") return json.dumps({"error": str(e)}) @mcp.resource("drivers://all") def get_drivers_resource() -> str: """ Real-time drivers dataset for AI context. Returns all drivers with current locations and status. Returns: JSON string containing drivers array with key fields: - driver_id, name, status, vehicle_type, vehicle_plate - current_lat, current_lng, last_location_update """ try: query = """ SELECT driver_id, name, status, vehicle_type, vehicle_plate, current_lat, current_lng, last_location_update FROM drivers ORDER BY name ASC """ drivers = execute_query(query) logger.info(f"Resource drivers://all - Retrieved {len(drivers) if drivers else 0} drivers") return json.dumps(drivers, default=str, indent=2) except Exception as e: logger.error(f"Resource drivers://all failed: {e}") return json.dumps({"error": str(e)}) # ============================================================================ # MCP PROMPTS (Workflows) # ============================================================================ # TODO: Add prompts once FastMCP prompt API is confirmed # Prompts will provide guided workflows for: # - create_order_workflow: Interactive order creation with validation # - assign_driver_workflow: Smart driver assignment with route optimization # - order_status_check: Quick order status queries # ============================================================================ # MCP TOOLS - ORDER CREATION & VALIDATION # ============================================================================ @mcp.tool() def geocode_address(address: str) -> dict: """ Convert a delivery address to GPS coordinates and validate the address format. Use this before creating an order to ensure the address is valid. Args: address: The full delivery address to geocode (e.g., '123 Main St, San Francisco, CA') Returns: dict: { success: bool, latitude: float, longitude: float, formatted_address: str, confidence: str (high/medium/low), message: str } """ from chat.tools import handle_geocode_address logger.info(f"Tool: geocode_address('{address}')") return handle_geocode_address({"address": address}) @mcp.tool() def calculate_route( origin: str, destination: str, mode: Literal["driving", "walking", "bicycling", "transit"] = "driving", vehicle_type: Literal["car", "van", "truck", "motorcycle", "bicycle"] = "car", alternatives: bool = False, include_steps: bool = False, avoid_tolls: bool = False, avoid_highways: bool = False, avoid_ferries: bool = False, emission_type: Literal["GASOLINE", "ELECTRIC", "HYBRID", "DIESEL"] = "GASOLINE", request_fuel_efficient: bool = False ) -> dict: """ Calculate the shortest route between two locations with vehicle-specific optimization. Uses Google Routes API for accurate real-time traffic, toll info, and fuel consumption. Args: origin: Starting location - either full address or coordinates as 'lat,lng' destination: Destination location - either full address or coordinates as 'lat,lng' mode: Travel mode for route calculation (default: driving) vehicle_type: Type of vehicle for route optimization (default: car) - motorcycle: Uses TWO_WHEELER mode for motorcycle-specific routing - bicycle: Uses bike lanes and paths - car/van/truck: Uses DRIVE mode (no truck-specific routing available) alternatives: Return multiple route options if available (default: false) include_steps: Include turn-by-turn navigation steps in response (default: false) avoid_tolls: Avoid toll roads (for cars and motorcycles) (default: false) avoid_highways: Avoid highways (for cars and motorcycles) (default: false) avoid_ferries: Avoid ferry routes (for cars and motorcycles) (default: false) emission_type: Vehicle emission type for eco-routing (cars/vans/trucks only) (default: GASOLINE) request_fuel_efficient: Request eco-friendly route alternative (cars/vans/trucks only) (default: false) Returns: dict: { success: bool, origin: str, destination: str, distance: {meters: int, text: str}, duration: {seconds: int, text: str} (without traffic), duration_in_traffic: {seconds: int, text: str} (with traffic), traffic_delay: {seconds: int, text: str}, mode: str, vehicle_type: str, route_summary: str, route_labels: list, confidence: str, toll_info: {has_tolls: bool, details: str} (if applicable), fuel_consumption: {liters: float, text: str} (if DRIVE mode), traffic_data_available: bool, warning: str (if TWO_WHEELER or BICYCLE mode), steps: list (if include_steps=True) } """ from chat.tools import handle_calculate_route logger.info(f"Tool: calculate_route('{origin}' -> '{destination}', vehicle={vehicle_type}, mode={mode})") return handle_calculate_route({ "origin": origin, "destination": destination, "mode": mode, "vehicle_type": vehicle_type, "alternatives": alternatives, "include_steps": include_steps, "avoid_tolls": avoid_tolls, "avoid_highways": avoid_highways, "avoid_ferries": avoid_ferries, "emission_type": emission_type, "request_fuel_efficient": request_fuel_efficient }) @mcp.tool() def calculate_intelligent_route( origin: str, destination: str, vehicle_type: Literal["car", "van", "truck", "motorcycle"] = "car", consider_weather: bool = True, consider_traffic: bool = True ) -> dict: """ Calculate the optimal route considering real-time traffic, weather conditions, and vehicle type. This is an intelligent routing tool that factors in: - Real-time traffic delays - Weather conditions (rain, visibility, wind) - Vehicle-specific capabilities (motorcycle vs car vs truck) - Safety warnings and recommendations Use this when you need smart routing that accounts for current conditions. Args: origin: Starting location - either full address or coordinates as 'lat,lng' destination: Destination location - either full address or coordinates as 'lat,lng' vehicle_type: Type of vehicle for route optimization (default: car) consider_weather: Factor in weather conditions (default: true) consider_traffic: Factor in real-time traffic (default: true) Returns: dict: { success: bool, route: { origin: str, destination: str, distance: {meters: int, text: str}, vehicle_type: str, route_summary: str }, timing: { base_duration: {seconds: int, text: str}, with_traffic: {seconds: int, text: str}, adjusted_duration: {seconds: int, text: str}, traffic_delay_percent: int, weather_delay_percent: int, total_delay_percent: int }, conditions: { traffic_status: str (clear|light|moderate|heavy|severe), weather_considered: bool }, weather: { conditions: str, temperature_c: float, precipitation_mm: float, visibility_m: int, impact_severity: str (none|minor|moderate|severe) }, recommendations: list[str], warnings: list[str], alternatives: list (if available) } Examples: - "Find the best route from SF to Oakland for a motorcycle considering weather" - "What's the fastest route from downtown to airport with current traffic?" - "Calculate delivery route for a truck from warehouse to customer address" """ from chat.route_optimizer import calculate_intelligent_route as calc_route logger.info(f"Tool: calculate_intelligent_route('{origin}' -> '{destination}', vehicle={vehicle_type})") return calc_route(origin, destination, vehicle_type, consider_weather, consider_traffic) @mcp.tool() def create_order( customer_name: str, delivery_address: str, delivery_lat: float, delivery_lng: float, expected_delivery_time: str, customer_phone: str | None = None, customer_email: str | None = None, priority: Literal["standard", "express", "urgent"] = "standard", weight_kg: float = 5.0, special_instructions: str | None = None, sla_grace_period_minutes: int = 15, time_window_end: str | None = None ) -> dict: """ Create a new delivery order in the database with MANDATORY delivery deadline. IMPORTANT: expected_delivery_time is REQUIRED. This is the promised delivery time to the customer. Only call this after geocoding the address successfully. Args: customer_name: Full name of the customer delivery_address: Complete delivery address delivery_lat: Latitude from geocoding delivery_lng: Longitude from geocoding expected_delivery_time: REQUIRED - Promised delivery deadline in ISO 8601 format Must be future timestamp. Examples: - '2025-11-15T18:00:00' (6 PM today) - '2025-11-16T12:00:00' (noon tomorrow) customer_phone: Customer phone number (optional) customer_email: Customer email address (optional) priority: Delivery priority level (default: standard) weight_kg: Package weight in kilograms (default: 5.0) special_instructions: Special delivery instructions (optional) sla_grace_period_minutes: Grace period after deadline (default: 15 mins) Deliveries within grace period marked as 'late' but acceptable time_window_end: Legacy field, defaults to expected_delivery_time if not provided Returns: dict: { success: bool, order_id: str, status: str, customer: str, address: str, expected_delivery: str (new), sla_grace_period_minutes: int (new), priority: str, message: str } """ # STEP 1: Authenticate user user = get_authenticated_user() if not user: return { "success": False, "error": "Authentication required. Please login first.", "auth_required": True } # STEP 2: Check permissions required_scope = get_required_scope('create_order') if not check_permission(user.get('scopes', []), required_scope): return { "success": False, "error": f"Permission denied. Required scope: {required_scope}" } # STEP 3: Execute tool with user_id from chat.tools import handle_create_order logger.info(f"Tool: create_order by user {user.get('email')} (customer='{customer_name}')") return handle_create_order( tool_input={ "customer_name": customer_name, "delivery_address": delivery_address, "delivery_lat": delivery_lat, "delivery_lng": delivery_lng, "expected_delivery_time": expected_delivery_time, "customer_phone": customer_phone, "customer_email": customer_email, "priority": priority, "weight_kg": weight_kg, "special_instructions": special_instructions, "sla_grace_period_minutes": sla_grace_period_minutes, "time_window_end": time_window_end }, user_id=user['user_id'] # Pass user_id for data isolation ) # ============================================================================ # MCP TOOLS - ORDER QUERYING # ============================================================================ @mcp.tool() def count_orders( status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None, priority: Literal["standard", "express", "urgent"] | None = None, payment_status: Literal["pending", "paid", "cod"] | None = None, assigned_driver_id: str | None = None, is_fragile: bool | None = None, requires_signature: bool | None = None, requires_cold_storage: bool | None = None ) -> dict: """ Count total orders in the database with optional filters. Use this when user asks 'how many orders', 'fetch orders', or wants to know order statistics. Args: status: Filter by order status (optional) priority: Filter by priority level (optional) payment_status: Filter by payment status (optional) assigned_driver_id: Filter by assigned driver ID (optional) is_fragile: Filter fragile packages only (optional) requires_signature: Filter orders requiring signature (optional) requires_cold_storage: Filter orders requiring cold storage (optional) Returns: dict: { success: bool, total: int, status_breakdown: dict, priority_breakdown: dict, message: str } """ from chat.tools import handle_count_orders logger.info(f"Tool: count_orders(status={status}, priority={priority})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('count_orders') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id tool_input = {} if status is not None: tool_input["status"] = status if priority is not None: tool_input["priority"] = priority if payment_status is not None: tool_input["payment_status"] = payment_status if assigned_driver_id is not None: tool_input["assigned_driver_id"] = assigned_driver_id if is_fragile is not None: tool_input["is_fragile"] = is_fragile if requires_signature is not None: tool_input["requires_signature"] = requires_signature if requires_cold_storage is not None: tool_input["requires_cold_storage"] = requires_cold_storage return handle_count_orders(tool_input, user_id=user['user_id']) @mcp.tool() def fetch_orders( limit: int = 10, offset: int = 0, status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None, priority: Literal["standard", "express", "urgent"] | None = None, payment_status: Literal["pending", "paid", "cod"] | None = None, assigned_driver_id: str | None = None, is_fragile: bool | None = None, requires_signature: bool | None = None, requires_cold_storage: bool | None = None, sort_by: Literal["created_at", "priority", "time_window_start"] = "created_at", sort_order: Literal["ASC", "DESC"] = "DESC" ) -> dict: """ Fetch orders from the database with optional filters, pagination, and sorting. Use after counting to show specific number of orders. Args: limit: Number of orders to fetch (default: 10, max: 100) offset: Number of orders to skip for pagination (default: 0) status: Filter by order status (optional) priority: Filter by priority level (optional) payment_status: Filter by payment status (optional) assigned_driver_id: Filter by assigned driver ID (optional) is_fragile: Filter fragile packages only (optional) requires_signature: Filter orders requiring signature (optional) requires_cold_storage: Filter orders requiring cold storage (optional) sort_by: Field to sort by (default: created_at) sort_order: Sort order (default: DESC for newest first) Returns: dict: { success: bool, orders: list[dict], count: int, message: str } """ from chat.tools import handle_fetch_orders logger.info(f"Tool: fetch_orders(limit={limit}, offset={offset}, status={status})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('fetch_orders') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id tool_input = { "limit": limit, "offset": offset, "sort_by": sort_by, "sort_order": sort_order } if status is not None: tool_input["status"] = status if priority is not None: tool_input["priority"] = priority if payment_status is not None: tool_input["payment_status"] = payment_status if assigned_driver_id is not None: tool_input["assigned_driver_id"] = assigned_driver_id if is_fragile is not None: tool_input["is_fragile"] = is_fragile if requires_signature is not None: tool_input["requires_signature"] = requires_signature if requires_cold_storage is not None: tool_input["requires_cold_storage"] = requires_cold_storage return handle_fetch_orders(tool_input, user_id=user['user_id']) @mcp.tool() def get_order_details(order_id: str) -> dict: """ Get complete details of a specific order by order ID. Use when user asks 'tell me about order X' or wants detailed information about a specific order. Args: order_id: The order ID to fetch details for (e.g., 'ORD-20251114163800') Returns: dict: { success: bool, order: dict (with all 26 fields), message: str } """ from chat.tools import handle_get_order_details logger.info(f"Tool: get_order_details(order_id='{order_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('get_order_details') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_get_order_details({"order_id": order_id}, user_id=user['user_id']) @mcp.tool() def search_orders(search_term: str) -> dict: """ Search for orders by customer name, email, phone, or order ID pattern. Use when user provides partial information to find orders. Args: search_term: Search term to match against customer_name, customer_email, customer_phone, or order_id Returns: dict: { success: bool, orders: list[dict], count: int, message: str } """ from chat.tools import handle_search_orders logger.info(f"Tool: search_orders(search_term='{search_term}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('search_orders') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_search_orders({"search_term": search_term}, user_id=user['user_id']) @mcp.tool() def get_incomplete_orders(limit: int = 20) -> dict: """ Get all orders that are not yet completed (excludes delivered and cancelled orders). Shortcut for finding orders in progress (pending, assigned, in_transit). Args: limit: Number of orders to fetch (default: 20) Returns: dict: { success: bool, orders: list[dict], count: int, message: str } """ from chat.tools import handle_get_incomplete_orders logger.info(f"Tool: get_incomplete_orders(limit={limit})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('get_incomplete_orders') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_get_incomplete_orders({"limit": limit}, user_id=user['user_id']) # ============================================================================ # MCP TOOLS - ORDER MANAGEMENT # ============================================================================ @mcp.tool() def update_order( order_id: str, customer_name: str | None = None, customer_phone: str | None = None, customer_email: str | None = None, delivery_address: str | None = None, delivery_lat: float | None = None, delivery_lng: float | None = None, status: Literal["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"] | None = None, priority: Literal["standard", "express", "urgent"] | None = None, special_instructions: str | None = None, time_window_end: str | None = None, payment_status: Literal["pending", "paid", "cod"] | None = None, weight_kg: float | None = None, order_value: float | None = None ) -> dict: """ Update an existing order's details. You can update any combination of fields. Only provide the fields you want to change. Auto-geocodes if delivery_address updated without coordinates. Args: order_id: Order ID to update (e.g., 'ORD-20250114123456') customer_name: Updated customer name (optional) customer_phone: Updated customer phone number (optional) customer_email: Updated customer email address (optional) delivery_address: Updated delivery address (optional) delivery_lat: Updated delivery latitude (required if updating address) (optional) delivery_lng: Updated delivery longitude (required if updating address) (optional) status: Updated order status (optional) priority: Updated priority level (optional) special_instructions: Updated special delivery instructions (optional) time_window_end: Updated delivery deadline (ISO format datetime) (optional) payment_status: Updated payment status (optional) weight_kg: Updated package weight in kilograms (optional) order_value: Updated order value in currency (optional) Returns: dict: { success: bool, order_id: str, updated_fields: list[str], message: str } """ from chat.tools import handle_update_order logger.info(f"Tool: update_order(order_id='{order_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('update_order') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id tool_input = {"order_id": order_id} if customer_name is not None: tool_input["customer_name"] = customer_name if customer_phone is not None: tool_input["customer_phone"] = customer_phone if customer_email is not None: tool_input["customer_email"] = customer_email if delivery_address is not None: tool_input["delivery_address"] = delivery_address if delivery_lat is not None: tool_input["delivery_lat"] = delivery_lat if delivery_lng is not None: tool_input["delivery_lng"] = delivery_lng if status is not None: tool_input["status"] = status if priority is not None: tool_input["priority"] = priority if special_instructions is not None: tool_input["special_instructions"] = special_instructions if time_window_end is not None: tool_input["time_window_end"] = time_window_end if payment_status is not None: tool_input["payment_status"] = payment_status if weight_kg is not None: tool_input["weight_kg"] = weight_kg if order_value is not None: tool_input["order_value"] = order_value return handle_update_order(tool_input, user_id=user['user_id']) @mcp.tool() def delete_order(order_id: str, confirm: bool) -> dict: """ Permanently delete an order from the database. This action cannot be undone. Use with caution. Args: order_id: Order ID to delete (e.g., 'ORD-20250114123456') confirm: Must be set to true to confirm deletion Returns: dict: { success: bool, order_id: str, message: str } """ from chat.tools import handle_delete_order logger.info(f"Tool: delete_order(order_id='{order_id}', confirm={confirm})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('delete_order') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_delete_order({"order_id": order_id, "confirm": confirm}, user_id=user['user_id']) # ============================================================================ # MCP TOOLS - DRIVER CREATION # ============================================================================ @mcp.tool() def create_driver( name: str, phone: str | None = None, email: str | None = None, vehicle_type: str = "van", vehicle_plate: str | None = None, capacity_kg: float = 1000.0, capacity_m3: float = 12.0, skills: list[str] | None = None, status: Literal["active", "busy", "offline", "unavailable"] = "active" ) -> dict: """ Create a new delivery driver in the database. Use this to onboard new drivers to the fleet. Args: name: Full name of the driver phone: Driver phone number (optional) email: Driver email address (optional) vehicle_type: Type of vehicle: van, truck, car, motorcycle (default: van) vehicle_plate: Vehicle license plate number (optional) capacity_kg: Vehicle cargo capacity in kilograms (default: 1000.0) capacity_m3: Vehicle cargo volume in cubic meters (default: 12.0) skills: List of driver skills/certifications: refrigerated, medical_certified, fragile_handler, overnight, express_delivery (optional) status: Driver status (default: active) Returns: dict: { success: bool, driver_id: str, name: str, status: str, vehicle_type: str, vehicle_plate: str, capacity_kg: float, skills: list[str], message: str } """ from chat.tools import handle_create_driver logger.info(f"Tool: create_driver(name='{name}', vehicle_type='{vehicle_type}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('create_driver') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_create_driver({ "name": name, "phone": phone, "email": email, "vehicle_type": vehicle_type, "vehicle_plate": vehicle_plate, "capacity_kg": capacity_kg, "capacity_m3": capacity_m3, "skills": skills or [], "status": status }, user_id=user['user_id']) # ============================================================================ # MCP TOOLS - DRIVER QUERYING # ============================================================================ @mcp.tool() def count_drivers( status: Literal["active", "busy", "offline", "unavailable"] | None = None, vehicle_type: str | None = None ) -> dict: """ Count total drivers in the database with optional filters. Use this when user asks 'how many drivers', 'show drivers', or wants driver statistics. Args: status: Filter by driver status (optional) vehicle_type: Filter by vehicle type: van, truck, car, motorcycle, etc. (optional) Returns: dict: { success: bool, total: int, status_breakdown: dict, vehicle_breakdown: dict, message: str } """ from chat.tools import handle_count_drivers logger.info(f"Tool: count_drivers(status={status}, vehicle_type={vehicle_type})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('count_drivers') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id tool_input = {} if status is not None: tool_input["status"] = status if vehicle_type is not None: tool_input["vehicle_type"] = vehicle_type return handle_count_drivers(tool_input, user_id=user['user_id']) @mcp.tool() def fetch_drivers( limit: int = 10, offset: int = 0, status: Literal["active", "busy", "offline", "unavailable"] | None = None, vehicle_type: str | None = None, sort_by: Literal["name", "status", "created_at", "last_location_update"] = "name", sort_order: Literal["ASC", "DESC"] = "ASC" ) -> dict: """ Fetch drivers from the database with optional filters, pagination, and sorting. Use after counting to show specific number of drivers. Args: limit: Number of drivers to fetch (default: 10, max: 100) offset: Number of drivers to skip for pagination (default: 0) status: Filter by driver status (optional) vehicle_type: Filter by vehicle type: van, truck, car, motorcycle, etc. (optional) sort_by: Field to sort by (default: name) sort_order: Sort order (default: ASC for alphabetical) Returns: dict: { success: bool, drivers: list[dict], count: int, message: str } """ from chat.tools import handle_fetch_drivers logger.info(f"Tool: fetch_drivers(limit={limit}, offset={offset}, status={status})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('fetch_drivers') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id tool_input = { "limit": limit, "offset": offset, "sort_by": sort_by, "sort_order": sort_order } if status is not None: tool_input["status"] = status if vehicle_type is not None: tool_input["vehicle_type"] = vehicle_type return handle_fetch_drivers(tool_input, user_id=user['user_id']) @mcp.tool() def get_driver_details(driver_id: str) -> dict: """ Get complete details of a specific driver by driver ID, including current location (latitude, longitude, and human-readable address via reverse geocoding), contact info, vehicle details, status, and skills. Use when user asks about a driver's location, coordinates, position, or any other driver information. Args: driver_id: The driver ID to fetch details for (e.g., 'DRV-20251114163800') Returns: dict: { success: bool, driver: dict (with all fields including reverse-geocoded location address), message: str } """ from chat.tools import handle_get_driver_details logger.info(f"Tool: get_driver_details(driver_id='{driver_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('get_driver_details') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_get_driver_details({"driver_id": driver_id}, user_id=user['user_id']) @mcp.tool() def search_drivers(search_term: str) -> dict: """ Search for drivers by name, email, phone, vehicle plate, or driver ID pattern. Use when user provides partial information to find drivers. Args: search_term: Search term to match against name, email, phone, vehicle_plate, or driver_id Returns: dict: { success: bool, drivers: list[dict], count: int, message: str } """ from chat.tools import handle_search_drivers logger.info(f"Tool: search_drivers(search_term='{search_term}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('search_drivers') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_search_drivers({"search_term": search_term}, user_id=user['user_id']) @mcp.tool() def get_available_drivers(limit: int = 20) -> dict: """ Get all drivers that are available for assignment (active or offline status, excludes busy and unavailable). Shortcut for finding drivers ready for dispatch. Args: limit: Number of drivers to fetch (default: 20) Returns: dict: { success: bool, drivers: list[dict], count: int, message: str } """ from chat.tools import handle_get_available_drivers logger.info(f"Tool: get_available_drivers(limit={limit})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('get_available_drivers') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_get_available_drivers({"limit": limit}, user_id=user['user_id']) # ============================================================================ # MCP TOOLS - DRIVER MANAGEMENT # ============================================================================ @mcp.tool() def update_driver( driver_id: str, name: str | None = None, phone: str | None = None, email: str | None = None, status: Literal["active", "busy", "offline", "unavailable"] | None = None, vehicle_type: str | None = None, vehicle_plate: str | None = None, capacity_kg: float | None = None, capacity_m3: float | None = None, skills: list[str] | None = None, current_lat: float | None = None, current_lng: float | None = None ) -> dict: """ Update an existing driver's details. You can update any combination of fields. Only provide the fields you want to change. Auto-updates last_location_update if coordinates changed. Args: driver_id: Driver ID to update (e.g., 'DRV-20250114123456') name: Updated driver name (optional) phone: Updated phone number (optional) email: Updated email address (optional) status: Updated driver status (optional) vehicle_type: Updated vehicle type (optional) vehicle_plate: Updated vehicle license plate (optional) capacity_kg: Updated cargo capacity in kilograms (optional) capacity_m3: Updated cargo capacity in cubic meters (optional) skills: Updated list of driver skills/certifications (optional) current_lat: Updated current latitude (optional) current_lng: Updated current longitude (optional) Returns: dict: { success: bool, driver_id: str, updated_fields: list[str], message: str } """ from chat.tools import handle_update_driver logger.info(f"Tool: update_driver(driver_id='{driver_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('update_driver') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id tool_input = {"driver_id": driver_id} if name is not None: tool_input["name"] = name if phone is not None: tool_input["phone"] = phone if email is not None: tool_input["email"] = email if status is not None: tool_input["status"] = status if vehicle_type is not None: tool_input["vehicle_type"] = vehicle_type if vehicle_plate is not None: tool_input["vehicle_plate"] = vehicle_plate if capacity_kg is not None: tool_input["capacity_kg"] = capacity_kg if capacity_m3 is not None: tool_input["capacity_m3"] = capacity_m3 if skills is not None: tool_input["skills"] = skills if current_lat is not None: tool_input["current_lat"] = current_lat if current_lng is not None: tool_input["current_lng"] = current_lng return handle_update_driver(tool_input, user_id=user['user_id']) @mcp.tool() def delete_driver(driver_id: str, confirm: bool) -> dict: """ Permanently delete a driver from the database. This action cannot be undone. Use with caution. Args: driver_id: Driver ID to delete (e.g., 'DRV-20250114123456') confirm: Must be set to true to confirm deletion Returns: dict: { success: bool, driver_id: str, message: str } """ from chat.tools import handle_delete_driver logger.info(f"Tool: delete_driver(driver_id='{driver_id}', confirm={confirm})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('delete_driver') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_delete_driver({"driver_id": driver_id, "confirm": confirm}, user_id=user['user_id']) @mcp.tool() def delete_all_orders(confirm: bool, status: str = None) -> dict: """ Bulk delete all orders (or orders with specific status). DANGEROUS - Use with extreme caution! Safety checks: - Requires confirm=true - Blocks deletion if any active assignments exist - Optional status filter to delete only specific statuses Args: confirm: Must be set to true to confirm bulk deletion status: Optional status filter (pending/assigned/in_transit/delivered/failed/cancelled) Returns: dict: { success: bool, deleted_count: int, message: str } """ from chat.tools import handle_delete_all_orders logger.info(f"Tool: delete_all_orders(confirm={confirm}, status='{status}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('delete_all_orders') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_delete_all_orders({"confirm": confirm, "status": status}, user_id=user['user_id']) @mcp.tool() def delete_all_drivers(confirm: bool, status: str = None) -> dict: """ Bulk delete all drivers (or drivers with specific status). DANGEROUS - Use with extreme caution! Safety checks: - Requires confirm=true - Blocks deletion if ANY assignments exist (due to RESTRICT constraint) - Optional status filter to delete only specific statuses Args: confirm: Must be set to true to confirm bulk deletion status: Optional status filter (active/busy/offline/unavailable) Returns: dict: { success: bool, deleted_count: int, message: str } """ from chat.tools import handle_delete_all_drivers logger.info(f"Tool: delete_all_drivers(confirm={confirm}, status='{status}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('delete_all_drivers') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_delete_all_drivers({"confirm": confirm, "status": status}, user_id=user['user_id']) # ============================================================================ # ASSIGNMENT TOOLS # ============================================================================ @mcp.tool() def create_assignment(order_id: str, driver_id: str) -> dict: """ Assign an order to a driver. Creates an assignment record with route data from driver location to delivery location. Requirements: - Order must be in 'pending' status - Driver must be in 'active' or 'available' status - Order cannot already have an active assignment After assignment: - Order status changes to 'assigned' - Driver status changes to 'busy' - Route data (distance, duration, path) is calculated and saved - Assignment record is created with all route details Args: order_id: Order ID to assign (e.g., 'ORD-20250114123456') driver_id: Driver ID to assign (e.g., 'DRV-20250114123456') Returns: dict: { success: bool, assignment_id: str, order_id: str, driver_id: str, route: { distance: {meters: int, text: str}, duration: {seconds: int, text: str}, route_summary: str, driver_start: {lat: float, lng: float}, delivery_location: {lat: float, lng: float, address: str} } } """ from chat.tools import handle_create_assignment logger.info(f"Tool: create_assignment(order_id='{order_id}', driver_id='{driver_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('create_assignment') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_create_assignment({"order_id": order_id, "driver_id": driver_id}, user_id=user['user_id']) @mcp.tool() def auto_assign_order(order_id: str) -> dict: """ Automatically assign order to the nearest available driver (distance + validation based). Selection Criteria (Auto Algorithm): 1. Driver must be 'active' with valid location 2. Driver vehicle capacity must meet package weight/volume requirements 3. Driver must have required skills (fragile handling, cold storage, etc.) 4. Selects nearest driver by real-time route distance This is a fixed-rule algorithm that prioritizes proximity while ensuring the driver has the necessary capacity and skills for the delivery. After assignment: - Order status changes to 'assigned' - Driver status changes to 'busy' - Route data (distance, duration, path) is calculated and saved - Assignment record is created with all route details Args: order_id: Order ID to auto-assign (e.g., 'ORD-20250114123456') Returns: dict: { success: bool, assignment_id: str, method: 'auto_assignment', order_id: str, driver_id: str, driver_name: str, driver_phone: str, driver_vehicle_type: str, selection_reason: str, distance_km: float, distance_meters: int, estimated_duration_minutes: float, candidates_evaluated: int, suitable_candidates: int, route_summary: str, estimated_arrival: str } """ from chat.tools import handle_auto_assign_order logger.info(f"Tool: auto_assign_order(order_id='{order_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('auto_assign_order') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_auto_assign_order({"order_id": order_id}, user_id=user['user_id']) @mcp.tool() def intelligent_assign_order(order_id: str) -> dict: """ Intelligently assign order using Google Gemini 2.0 Flash AI to analyze all parameters and select the best driver. Uses Gemini 2.0 Flash (latest model) to evaluate: - Order characteristics (priority, weight, fragility, time constraints, value) - Driver capabilities (location, capacity, skills, vehicle type) - Real-time routing data (distance, traffic delays, tolls) - Weather conditions and impact on delivery - Complex tradeoffs and optimal matching The AI considers multiple factors holistically: - Distance efficiency vs skill requirements - Capacity utilization vs delivery urgency - Traffic conditions vs time constraints - Weather safety vs speed requirements - Cost efficiency (tolls, fuel) vs customer satisfaction Returns assignment with detailed AI reasoning explaining why the selected driver is the best match for this specific delivery. Requirements: - GOOGLE_API_KEY environment variable must be set - Order must be in 'pending' status - At least one active driver with valid location After assignment: - Order status changes to 'assigned' - Driver status changes to 'busy' - Route data (distance, duration, path) is calculated and saved - Assignment record is created with all route details - AI reasoning is returned for transparency Args: order_id: Order ID to intelligently assign (e.g., 'ORD-20250114123456') Returns: dict: { success: bool, assignment_id: str, method: 'intelligent_assignment', ai_provider: 'Google Gemini 2.0 Flash', order_id: str, driver_id: str, driver_name: str, driver_phone: str, driver_vehicle_type: str, distance_km: float, estimated_duration_minutes: float, ai_reasoning: { primary_factors: [str], trade_offs_considered: [str], risk_assessment: str, decision_summary: str }, confidence_score: float, alternatives_considered: [{driver_id: str, reason_not_selected: str}], candidates_evaluated: int, route_summary: str, estimated_arrival: str } """ from chat.tools import handle_intelligent_assign_order logger.info(f"Tool: intelligent_assign_order(order_id='{order_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('intelligent_assign_order') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_intelligent_assign_order({"order_id": order_id}, user_id=user['user_id']) @mcp.tool() def get_assignment_details( assignment_id: str = None, order_id: str = None, driver_id: str = None ) -> dict: """ Get assignment details by assignment ID, order ID, or driver ID. Provide at least one parameter to search. Args: assignment_id: Assignment ID (e.g., 'ASN-20250114123456') order_id: Order ID to find assignments for (e.g., 'ORD-20250114123456') driver_id: Driver ID to find assignments for (e.g., 'DRV-20250114123456') Returns: dict: { success: bool, assignments: [ { assignment_id: str, order_id: str, driver_id: str, customer_name: str, driver_name: str, status: str, route_distance_meters: int, route_duration_seconds: int, route_summary: str, driver_start_location: {lat: float, lng: float}, delivery_location: {lat: float, lng: float, address: str}, estimated_arrival: str, assigned_at: str, updated_at: str } ] } """ from chat.tools import handle_get_assignment_details logger.info(f"Tool: get_assignment_details(assignment_id='{assignment_id}', order_id='{order_id}', driver_id='{driver_id}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('get_assignment_details') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_get_assignment_details({ "assignment_id": assignment_id, "order_id": order_id, "driver_id": driver_id }, user_id=user['user_id']) @mcp.tool() def update_assignment( assignment_id: str, status: str = None, actual_arrival: str = None, actual_distance_meters: int = None, notes: str = None ) -> dict: """ Update assignment status or details. Valid status transitions: - active → in_progress (driver starts delivery) - in_progress → completed (delivery successful) - in_progress → failed (delivery failed) - active/in_progress → cancelled (assignment cancelled) Cascading updates: - completed: order status → 'delivered', driver checks for other assignments - failed: order status → 'failed', driver checks for other assignments - cancelled: order status → 'cancelled', order.assigned_driver_id → NULL, driver → 'active' if no other assignments Args: assignment_id: Assignment ID to update (e.g., 'ASN-20250114123456') status: New status (active, in_progress, completed, failed, cancelled) actual_arrival: Actual arrival timestamp (ISO format) actual_distance_meters: Actual distance traveled in meters notes: Additional notes about the assignment Returns: dict: { success: bool, assignment_id: str, updated_fields: list, cascading_actions: list, message: str } """ from chat.tools import handle_update_assignment logger.info(f"Tool: update_assignment(assignment_id='{assignment_id}', status='{status}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('update_assignment') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_update_assignment({ "assignment_id": assignment_id, "status": status, "actual_arrival": actual_arrival, "actual_distance_meters": actual_distance_meters, "notes": notes }, user_id=user['user_id']) @mcp.tool() def unassign_order(assignment_id: str, confirm: bool = False) -> dict: """ Unassign an order from a driver by deleting the assignment. Requirements: - Assignment cannot be in 'in_progress' status (must cancel first using update_assignment) - Requires confirm=true to proceed Effects: - Assignment is deleted - Order status changes back to 'pending' - order.assigned_driver_id is set to NULL - Driver status changes to 'active' (if no other assignments) Args: assignment_id: Assignment ID to unassign (e.g., 'ASN-20250114123456') confirm: Must be set to true to confirm unassignment Returns: dict: { success: bool, assignment_id: str, order_id: str, driver_id: str, message: str } """ from chat.tools import handle_unassign_order logger.info(f"Tool: unassign_order(assignment_id='{assignment_id}', confirm={confirm})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('unassign_order') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_unassign_order({"assignment_id": assignment_id, "confirm": confirm}, user_id=user['user_id']) @mcp.tool() def complete_delivery( assignment_id: str, confirm: bool, actual_distance_meters: int = None, notes: str = None ) -> dict: """ Mark a delivery as successfully completed and automatically update driver location to delivery address. This is the primary tool for completing deliveries. It handles all necessary updates: - Marks assignment as 'completed' with timestamp - Updates order status to 'delivered' - **Automatically moves driver location to the delivery address** - Updates driver status to 'active' (if no other assignments) - Records actual distance and notes (optional) Requirements: - Assignment must be in 'active' or 'in_progress' status - Delivery location coordinates must exist - Requires confirm=true For failed deliveries: Use fail_delivery tool instead. Args: assignment_id: Assignment ID to complete (e.g., 'ASN-20250114123456') confirm: Must be set to true to confirm completion actual_distance_meters: Optional actual distance traveled in meters notes: Optional completion notes Returns: dict: { success: bool, assignment_id: str, order_id: str, driver_id: str, customer_name: str, driver_name: str, completed_at: str (ISO timestamp), delivery_location: {lat, lng, address}, driver_updated: {new_location, location_updated_at}, cascading_actions: list[str], message: str } """ from chat.tools import handle_complete_delivery logger.info(f"Tool: complete_delivery(assignment_id='{assignment_id}', confirm={confirm})") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('complete_delivery') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_complete_delivery({ "assignment_id": assignment_id, "confirm": confirm, "actual_distance_meters": actual_distance_meters, "notes": notes }, user_id=user['user_id']) @mcp.tool() def fail_delivery( assignment_id: str, current_lat: float, current_lng: float, failure_reason: str, confirm: bool, notes: str = None ) -> dict: """ Mark a delivery as failed with mandatory driver location and failure reason. IMPORTANT: Driver MUST provide their current GPS location and a valid failure reason. This ensures accurate location tracking and proper failure documentation. Handles all necessary updates: - Marks assignment as 'failed' with timestamp - Updates order status to 'failed' - **Updates driver location to the reported current position** - Updates driver status to 'active' (if no other assignments) - Records structured failure reason and optional notes Valid failure reasons: - customer_not_available: Customer not present or not reachable - wrong_address: Incorrect or invalid delivery address - refused_delivery: Customer refused to accept delivery - damaged_goods: Package damaged during transit - payment_issue: Payment problems (for COD orders) - vehicle_breakdown: Driver's vehicle broke down - access_restricted: Cannot access delivery location - weather_conditions: Severe weather preventing delivery - other: Other reasons (provide details in notes) Requirements: - Assignment must be in 'active' or 'in_progress' status - Driver must provide current GPS coordinates - Must provide a valid failure_reason from the list above - Requires confirm=true Args: assignment_id: Assignment ID to mark as failed (e.g., 'ASN-20250114123456') current_lat: Driver's current latitude (-90 to 90) current_lng: Driver's current longitude (-180 to 180) failure_reason: Reason for failure (must be from valid list) confirm: Must be set to true to confirm failure notes: Optional additional details about the failure Returns: dict: { success: bool, assignment_id: str, order_id: str, driver_id: str, customer_name: str, driver_name: str, failed_at: str (ISO timestamp), failure_reason: str, failure_reason_display: str (human-readable), delivery_address: str, driver_location: {lat, lng, updated_at}, cascading_actions: list[str], message: str } """ from chat.tools import handle_fail_delivery logger.info(f"Tool: fail_delivery(assignment_id='{assignment_id}', reason='{failure_reason}')") # STEP 1: Authenticate user user = get_authenticated_user() if not user: return {"success": False, "error": "Authentication required"} # STEP 2: Check permissions required_scope = get_required_scope('fail_delivery') if not check_permission(user.get('scopes', []), required_scope): return {"success": False, "error": "Permission denied"} # STEP 3: Execute with user_id return handle_fail_delivery({ "assignment_id": assignment_id, "current_lat": current_lat, "current_lng": current_lng, "failure_reason": failure_reason, "confirm": confirm, "notes": notes }, user_id=user['user_id']) # ============================================================================ # MAIN ENTRY POINT # ============================================================================ if __name__ == "__main__": logger.info("=" * 60) logger.info("FleetMind MCP Server v1.0.0") logger.info("=" * 60) logger.info(f"Geocoding: {geocoding_service.get_status()}") logger.info("Tools: 27 tools registered (19 core + 6 assignment + 2 bulk delete)") logger.info("Resources: 2 resources available") logger.info("Prompts: 3 workflow templates") logger.info("Starting MCP server...") mcp.run()