""" User Context and Authentication Module Handles Stytch authentication and user permission checks """ from typing import Optional, Dict, Any import os from stytch import Client # Initialize Stytch client stytch_client = None if os.getenv('STYTCH_PROJECT_ID') and os.getenv('STYTCH_SECRET'): try: stytch_client = Client( project_id=os.getenv('STYTCH_PROJECT_ID'), secret=os.getenv('STYTCH_SECRET') ) print(f"Stytch client initialized with project: {os.getenv('STYTCH_PROJECT_ID')[:20]}...") except Exception as e: print(f"Warning: Stytch client initialization failed: {e}") def verify_token(token: str) -> Optional[Dict[str, Any]]: """ Verify authentication token and return user info Args: token: Bearer token from Authorization header Returns: User info dict with user_id, email, scopes None if token is invalid """ # Development mode: Skip authentication if os.getenv('ENVIRONMENT') == 'development' and os.getenv('SKIP_AUTH') == 'true': return { 'user_id': 'dev-user', 'email': 'dev@fleetmind.local', 'scopes': ['admin'], 'name': 'Development User' } if not stytch_client: print("Error: Stytch client not initialized") return None if not token: return None try: # Verify session token with Stytch result = stytch_client.sessions.authenticate(session_token=token) # Extract user information user_info = { 'user_id': result.user.user_id, 'email': result.user.emails[0].email if result.user.emails else 'unknown', 'scopes': result.session.custom_claims.get('scopes', ['orders:read', 'drivers:read']), 'name': result.user.name if hasattr(result.user, 'name') else 'Unknown User' } return user_info except Exception as e: print(f"Token validation failed: {e}") return None def check_permission(user_scopes: list, required_scope: str) -> bool: """ Check if user has required permission Args: user_scopes: List of scopes user has required_scope: Scope needed for this operation Returns: True if user has permission """ # Admin has all permissions if 'admin' in user_scopes: return True # Check specific scope return required_scope in user_scopes # Scope requirements for each tool SCOPE_REQUIREMENTS = { # Order operations 'create_order': 'orders:write', 'fetch_orders': 'orders:read', 'update_order': 'orders:write', 'delete_order': 'orders:write', 'search_orders': 'orders:read', 'get_order_details': 'orders:read', 'count_orders': 'orders:read', 'get_incomplete_orders': 'orders:read', # Driver operations 'create_driver': 'drivers:write', 'fetch_drivers': 'drivers:read', 'update_driver': 'drivers:write', 'delete_driver': 'drivers:write', 'search_drivers': 'drivers:read', 'get_driver_details': 'drivers:read', 'count_drivers': 'drivers:read', 'get_available_drivers': 'drivers:read', # Assignment operations 'create_assignment': 'assignments:manage', 'auto_assign_order': 'assignments:manage', 'intelligent_assign_order': 'assignments:manage', 'get_assignment_details': 'assignments:manage', 'update_assignment': 'assignments:manage', 'unassign_order': 'assignments:manage', 'complete_delivery': 'assignments:manage', 'fail_delivery': 'assignments:manage', # Routing (public - no scope required) 'geocode_address': None, 'calculate_route': None, 'calculate_intelligent_route': None, # Dangerous operations (admin only) 'delete_all_orders': 'admin', 'delete_all_drivers': 'admin', } def get_required_scope(tool_name: str) -> Optional[str]: """ Get the scope required for a tool Args: tool_name: Name of the tool Returns: Required scope or None if tool is public """ return SCOPE_REQUIREMENTS.get(tool_name, 'admin')