""" API Key Authentication System for FleetMind MCP Server Simple API key management for multi-tenant authentication without OAuth complexity. Works with Claude Desktop and mcp-remote today! Usage: 1. User generates API key via web interface or CLI 2. User adds API key to Claude Desktop config 3. MCP server validates key and returns user_id 4. Multi-tenant isolation works automatically """ import os import secrets import hashlib from datetime import datetime from typing import Optional, Dict from database.connection import get_db_connection def create_api_keys_table(): """Create api_keys table if it doesn't exist""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS api_keys ( key_id SERIAL PRIMARY KEY, user_id VARCHAR(100) NOT NULL, email VARCHAR(255) NOT NULL, name VARCHAR(255), api_key_hash VARCHAR(64) NOT NULL UNIQUE, api_key_prefix VARCHAR(20) NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_used_at TIMESTAMP, is_active BOOLEAN DEFAULT true, UNIQUE(user_id) ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_api_keys_hash ON api_keys(api_key_hash) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_api_keys_user ON api_keys(user_id) """) conn.commit() cursor.close() conn.close() def generate_api_key(email: str, name: str = None) -> Dict[str, str]: """ Generate a new API key for a user Args: email: User's email address (used as identifier) name: User's display name (optional) Returns: dict with api_key (show once!), user_id, email, name """ # Generate secure random API key api_key = f"fm_{secrets.token_urlsafe(32)}" # fm_ prefix for FleetMind # Hash the API key for storage (never store plain text!) api_key_hash = hashlib.sha256(api_key.encode()).hexdigest() # Store prefix for display (first 12 chars) api_key_prefix = api_key[:12] # Generate user_id from email user_id = f"user_{hashlib.md5(email.encode()).hexdigest()[:12]}" conn = get_db_connection() cursor = conn.cursor() try: # Check if user already has a key cursor.execute("SELECT user_id FROM api_keys WHERE email = %s", (email,)) existing = cursor.fetchone() if existing: cursor.close() conn.close() return { "success": False, "error": "User already has an API key. Revoke the old key first." } # Insert new API key cursor.execute(""" INSERT INTO api_keys (user_id, email, name, api_key_hash, api_key_prefix) VALUES (%s, %s, %s, %s, %s) RETURNING user_id, email, name, created_at """, (user_id, email, name, api_key_hash, api_key_prefix)) result = cursor.fetchone() conn.commit() if not result: raise Exception("Failed to insert API key") # Unpack the result tuple ret_user_id, ret_email, ret_name, ret_created_at = result return { "success": True, "api_key": api_key, # SHOW THIS ONCE! Never displayed again "user_id": ret_user_id, "email": ret_email, "name": ret_name or "FleetMind User", "created_at": str(ret_created_at) if ret_created_at else "", "message": "⚠️ IMPORTANT: Save this API key now! It won't be shown again." } except Exception as e: conn.rollback() import traceback error_details = traceback.format_exc() print(f"API Key Generation Error: {e}") print(f"Error details: {error_details}") return { "success": False, "error": f"Failed to generate API key: {str(e)}" } finally: cursor.close() conn.close() def verify_api_key(api_key: str) -> Optional[Dict[str, str]]: """ Verify API key and return user info Args: api_key: The API key to verify Returns: User info dict if valid, None if invalid """ if not api_key or not api_key.startswith("fm_"): return None # Hash the provided key api_key_hash = hashlib.sha256(api_key.encode()).hexdigest() conn = get_db_connection() cursor = conn.cursor() try: # Look up the key cursor.execute(""" SELECT user_id, email, name, is_active FROM api_keys WHERE api_key_hash = %s """, (api_key_hash,)) result = cursor.fetchone() if not result: return None # Access RealDictRow fields by key (not tuple unpacking!) user_id = result['user_id'] email = result['email'] name = result['name'] is_active = result['is_active'] if not is_active: return None # Update last_used_at cursor.execute(""" UPDATE api_keys SET last_used_at = CURRENT_TIMESTAMP WHERE api_key_hash = %s """, (api_key_hash,)) conn.commit() return { 'user_id': user_id, 'email': email, 'name': name or 'FleetMind User', 'scopes': ['orders:read', 'orders:write', 'drivers:read', 'drivers:write', 'assignments:manage'] } except Exception as e: print(f"API key verification error: {e}") return None finally: cursor.close() conn.close() def list_api_keys() -> list: """List all API keys (without showing actual keys)""" conn = get_db_connection() cursor = conn.cursor() cursor.execute(""" SELECT user_id, email, name, api_key_prefix, created_at, last_used_at, is_active FROM api_keys ORDER BY created_at DESC """) keys = [] for row in cursor.fetchall(): keys.append({ 'user_id': row[0], 'email': row[1], 'name': row[2], 'key_preview': f"{row[3]}...", 'created_at': row[4].isoformat(), 'last_used_at': row[5].isoformat() if row[5] else None, 'is_active': row[6] }) cursor.close() conn.close() return keys def revoke_api_key(email: str) -> Dict[str, any]: """Revoke (deactivate) an API key""" conn = get_db_connection() cursor = conn.cursor() try: cursor.execute(""" UPDATE api_keys SET is_active = false WHERE email = %s RETURNING user_id, email """, (email,)) result = cursor.fetchone() conn.commit() if result: return { "success": True, "message": f"API key revoked for {result[1]}" } else: return { "success": False, "error": "No API key found for this email" } except Exception as e: conn.rollback() return { "success": False, "error": f"Failed to revoke key: {str(e)}" } finally: cursor.close() conn.close() # Initialize table on import try: create_api_keys_table() except: pass # Table might already exist