|
|
""" |
|
|
User Context and Authentication Module |
|
|
Handles Stytch authentication and user permission checks |
|
|
""" |
|
|
|
|
|
from typing import Optional, Dict, Any |
|
|
import os |
|
|
from stytch import Client |
|
|
|
|
|
|
|
|
stytch_client = None |
|
|
if os.getenv('STYTCH_PROJECT_ID') and os.getenv('STYTCH_SECRET'): |
|
|
try: |
|
|
stytch_client = Client( |
|
|
project_id=os.getenv('STYTCH_PROJECT_ID'), |
|
|
secret=os.getenv('STYTCH_SECRET') |
|
|
) |
|
|
print(f"Stytch client initialized with project: {os.getenv('STYTCH_PROJECT_ID')[:20]}...") |
|
|
except Exception as e: |
|
|
print(f"Warning: Stytch client initialization failed: {e}") |
|
|
|
|
|
|
|
|
def verify_token(token: str) -> Optional[Dict[str, Any]]: |
|
|
""" |
|
|
Verify authentication token and return user info |
|
|
|
|
|
Args: |
|
|
token: Bearer token from Authorization header |
|
|
|
|
|
Returns: |
|
|
User info dict with user_id, email, scopes |
|
|
None if token is invalid |
|
|
""" |
|
|
|
|
|
if os.getenv('ENVIRONMENT') == 'development' and os.getenv('SKIP_AUTH') == 'true': |
|
|
return { |
|
|
'user_id': 'dev-user', |
|
|
'email': '[email protected]', |
|
|
'scopes': ['admin'], |
|
|
'name': 'Development User' |
|
|
} |
|
|
|
|
|
if not stytch_client: |
|
|
print("Error: Stytch client not initialized") |
|
|
return None |
|
|
|
|
|
if not token: |
|
|
return None |
|
|
|
|
|
try: |
|
|
|
|
|
result = stytch_client.sessions.authenticate(session_token=token) |
|
|
|
|
|
|
|
|
user_info = { |
|
|
'user_id': result.user.user_id, |
|
|
'email': result.user.emails[0].email if result.user.emails else 'unknown', |
|
|
'scopes': result.session.custom_claims.get('scopes', ['orders:read', 'drivers:read']), |
|
|
'name': result.user.name if hasattr(result.user, 'name') else 'Unknown User' |
|
|
} |
|
|
|
|
|
return user_info |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Token validation failed: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def check_permission(user_scopes: list, required_scope: str) -> bool: |
|
|
""" |
|
|
Check if user has required permission |
|
|
|
|
|
Args: |
|
|
user_scopes: List of scopes user has |
|
|
required_scope: Scope needed for this operation |
|
|
|
|
|
Returns: |
|
|
True if user has permission |
|
|
""" |
|
|
|
|
|
if 'admin' in user_scopes: |
|
|
return True |
|
|
|
|
|
|
|
|
return required_scope in user_scopes |
|
|
|
|
|
|
|
|
|
|
|
SCOPE_REQUIREMENTS = { |
|
|
|
|
|
'create_order': 'orders:write', |
|
|
'fetch_orders': 'orders:read', |
|
|
'update_order': 'orders:write', |
|
|
'delete_order': 'orders:write', |
|
|
'search_orders': 'orders:read', |
|
|
'get_order_details': 'orders:read', |
|
|
'count_orders': 'orders:read', |
|
|
'get_incomplete_orders': 'orders:read', |
|
|
|
|
|
|
|
|
'create_driver': 'drivers:write', |
|
|
'fetch_drivers': 'drivers:read', |
|
|
'update_driver': 'drivers:write', |
|
|
'delete_driver': 'drivers:write', |
|
|
'search_drivers': 'drivers:read', |
|
|
'get_driver_details': 'drivers:read', |
|
|
'count_drivers': 'drivers:read', |
|
|
'get_available_drivers': 'drivers:read', |
|
|
|
|
|
|
|
|
'create_assignment': 'assignments:manage', |
|
|
'auto_assign_order': 'assignments:manage', |
|
|
'intelligent_assign_order': 'assignments:manage', |
|
|
'get_assignment_details': 'assignments:manage', |
|
|
'update_assignment': 'assignments:manage', |
|
|
'unassign_order': 'assignments:manage', |
|
|
'complete_delivery': 'assignments:manage', |
|
|
'fail_delivery': 'assignments:manage', |
|
|
|
|
|
|
|
|
'geocode_address': None, |
|
|
'calculate_route': None, |
|
|
'calculate_intelligent_route': None, |
|
|
|
|
|
|
|
|
'delete_all_orders': 'admin', |
|
|
'delete_all_drivers': 'admin', |
|
|
} |
|
|
|
|
|
|
|
|
def get_required_scope(tool_name: str) -> Optional[str]: |
|
|
""" |
|
|
Get the scope required for a tool |
|
|
|
|
|
Args: |
|
|
tool_name: Name of the tool |
|
|
|
|
|
Returns: |
|
|
Required scope or None if tool is public |
|
|
""" |
|
|
return SCOPE_REQUIREMENTS.get(tool_name, 'admin') |
|
|
|