fleetmind-dispatch-ai / database /user_context.py
mashrur950's picture
added authentication
d965a0a
raw
history blame
4.12 kB
"""
User Context and Authentication Module
Handles Stytch authentication and user permission checks
"""
from typing import Optional, Dict, Any
import os
from stytch import Client
# Initialize Stytch client
stytch_client = None
if os.getenv('STYTCH_PROJECT_ID') and os.getenv('STYTCH_SECRET'):
try:
stytch_client = Client(
project_id=os.getenv('STYTCH_PROJECT_ID'),
secret=os.getenv('STYTCH_SECRET')
)
print(f"Stytch client initialized with project: {os.getenv('STYTCH_PROJECT_ID')[:20]}...")
except Exception as e:
print(f"Warning: Stytch client initialization failed: {e}")
def verify_token(token: str) -> Optional[Dict[str, Any]]:
"""
Verify authentication token and return user info
Args:
token: Bearer token from Authorization header
Returns:
User info dict with user_id, email, scopes
None if token is invalid
"""
# Development mode: Skip authentication
if os.getenv('ENVIRONMENT') == 'development' and os.getenv('SKIP_AUTH') == 'true':
return {
'user_id': 'dev-user',
'email': '[email protected]',
'scopes': ['admin'],
'name': 'Development User'
}
if not stytch_client:
print("Error: Stytch client not initialized")
return None
if not token:
return None
try:
# Verify session token with Stytch
result = stytch_client.sessions.authenticate(session_token=token)
# Extract user information
user_info = {
'user_id': result.user.user_id,
'email': result.user.emails[0].email if result.user.emails else 'unknown',
'scopes': result.session.custom_claims.get('scopes', ['orders:read', 'drivers:read']),
'name': result.user.name if hasattr(result.user, 'name') else 'Unknown User'
}
return user_info
except Exception as e:
print(f"Token validation failed: {e}")
return None
def check_permission(user_scopes: list, required_scope: str) -> bool:
"""
Check if user has required permission
Args:
user_scopes: List of scopes user has
required_scope: Scope needed for this operation
Returns:
True if user has permission
"""
# Admin has all permissions
if 'admin' in user_scopes:
return True
# Check specific scope
return required_scope in user_scopes
# Scope requirements for each tool
SCOPE_REQUIREMENTS = {
# Order operations
'create_order': 'orders:write',
'fetch_orders': 'orders:read',
'update_order': 'orders:write',
'delete_order': 'orders:write',
'search_orders': 'orders:read',
'get_order_details': 'orders:read',
'count_orders': 'orders:read',
'get_incomplete_orders': 'orders:read',
# Driver operations
'create_driver': 'drivers:write',
'fetch_drivers': 'drivers:read',
'update_driver': 'drivers:write',
'delete_driver': 'drivers:write',
'search_drivers': 'drivers:read',
'get_driver_details': 'drivers:read',
'count_drivers': 'drivers:read',
'get_available_drivers': 'drivers:read',
# Assignment operations
'create_assignment': 'assignments:manage',
'auto_assign_order': 'assignments:manage',
'intelligent_assign_order': 'assignments:manage',
'get_assignment_details': 'assignments:manage',
'update_assignment': 'assignments:manage',
'unassign_order': 'assignments:manage',
'complete_delivery': 'assignments:manage',
'fail_delivery': 'assignments:manage',
# Routing (public - no scope required)
'geocode_address': None,
'calculate_route': None,
'calculate_intelligent_route': None,
# Dangerous operations (admin only)
'delete_all_orders': 'admin',
'delete_all_drivers': 'admin',
}
def get_required_scope(tool_name: str) -> Optional[str]:
"""
Get the scope required for a tool
Args:
tool_name: Name of the tool
Returns:
Required scope or None if tool is public
"""
return SCOPE_REQUIREMENTS.get(tool_name, 'admin')