mashrur950's picture
Add order and driver management tools with update and delete functionalities
a7365ce
raw
history blame
43.6 kB
"""
FleetMind MCP - Gradio Web Interface
Enhanced 3-tab dashboard: Chat, Orders, Drivers
"""
import sys
from pathlib import Path
# Add parent directory to path
sys.path.insert(0, str(Path(__file__).parent.parent))
import gradio as gr
from database.connection import execute_query, execute_write, test_connection
from datetime import datetime, timedelta
import json
# Import chat functionality
from chat.chat_engine import ChatEngine
from chat.conversation import ConversationManager
from chat.geocoding import GeocodingService
import uuid
# Global session storage
SESSIONS = {}
# Initialize chat engine and geocoding service
chat_engine = ChatEngine()
geocoding_service = GeocodingService()
# ============================================
# STATISTICS FUNCTIONS
# ============================================
def get_orders_stats():
"""Get order statistics by status"""
try:
query = """
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status = 'pending' THEN 1 END) as pending,
COUNT(CASE WHEN status = 'assigned' THEN 1 END) as assigned,
COUNT(CASE WHEN status = 'in_transit' THEN 1 END) as in_transit,
COUNT(CASE WHEN status = 'delivered' THEN 1 END) as delivered,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed,
COUNT(CASE WHEN status = 'cancelled' THEN 1 END) as cancelled
FROM orders
"""
result = execute_query(query)
if result:
return result[0]
return {"total": 0, "pending": 0, "assigned": 0, "in_transit": 0, "delivered": 0, "failed": 0, "cancelled": 0}
except Exception as e:
print(f"Error getting order stats: {e}")
return {"total": 0, "pending": 0, "assigned": 0, "in_transit": 0, "delivered": 0, "failed": 0, "cancelled": 0}
def get_drivers_stats():
"""Get driver statistics by status"""
try:
query = """
SELECT
COUNT(*) as total,
COUNT(CASE WHEN status = 'active' THEN 1 END) as active,
COUNT(CASE WHEN status = 'busy' THEN 1 END) as busy,
COUNT(CASE WHEN status = 'offline' THEN 1 END) as offline,
COUNT(CASE WHEN status = 'unavailable' THEN 1 END) as unavailable
FROM drivers
"""
result = execute_query(query)
if result:
return result[0]
return {"total": 0, "active": 0, "busy": 0, "offline": 0, "unavailable": 0}
except Exception as e:
print(f"Error getting driver stats: {e}")
return {"total": 0, "active": 0, "busy": 0, "offline": 0, "unavailable": 0}
# ============================================
# ORDERS FUNCTIONS
# ============================================
def get_all_orders(status_filter="all", priority_filter="all", payment_filter="all", search_term=""):
"""Get orders with filters"""
try:
where_clauses = []
params = []
if status_filter and status_filter != "all":
where_clauses.append("status = %s")
params.append(status_filter)
if priority_filter and priority_filter != "all":
where_clauses.append("priority = %s")
params.append(priority_filter)
if payment_filter and payment_filter != "all":
where_clauses.append("payment_status = %s")
params.append(payment_filter)
if search_term:
where_clauses.append("(order_id ILIKE %s OR customer_name ILIKE %s OR customer_phone ILIKE %s)")
search_pattern = f"%{search_term}%"
params.extend([search_pattern, search_pattern, search_pattern])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
query = f"""
SELECT
order_id,
customer_name,
delivery_address,
status,
priority,
assigned_driver_id,
time_window_end,
created_at
FROM orders
{where_sql}
ORDER BY created_at DESC
LIMIT 100
"""
results = execute_query(query, tuple(params) if params else ())
if not results:
return [["-", "-", "-", "-", "-", "-", "-"]]
# Format data for table
data = []
for row in results:
# Truncate address if too long
address = row['delivery_address']
if len(address) > 40:
address = address[:37] + "..."
# Format deadline
deadline = row['time_window_end'].strftime("%Y-%m-%d %H:%M") if row['time_window_end'] else "No deadline"
# Driver ID or "Unassigned"
driver = row['assigned_driver_id'] if row['assigned_driver_id'] else "Unassigned"
data.append([
row['order_id'],
row['customer_name'],
address,
row['status'],
row['priority'],
driver,
deadline
])
return data
except Exception as e:
print(f"Error fetching orders: {e}")
return [[f"Error: {str(e)}", "", "", "", "", "", ""]]
def get_order_details(order_id):
"""Get complete order details"""
if not order_id or order_id == "-":
return "Select an order from the table to view details"
try:
query = """
SELECT * FROM orders WHERE order_id = %s
"""
results = execute_query(query, (order_id,))
if not results:
return f"Order {order_id} not found"
order = results[0]
# Format the details nicely
details = f"""
# Order Details: {order_id}
## Customer Information
- **Name:** {order['customer_name']}
- **Phone:** {order['customer_phone'] or 'N/A'}
- **Email:** {order['customer_email'] or 'N/A'}
## Delivery Information
- **Address:** {order['delivery_address']}
- **Coordinates:** ({order['delivery_lat']}, {order['delivery_lng']})
- **Time Window:** {order['time_window_start']} to {order['time_window_end']}
## Package Details
- **Weight:** {order['weight_kg'] or 'N/A'} kg
- **Volume:** {order['volume_m3'] or 'N/A'} mΒ³
- **Is Fragile:** {"Yes" if order['is_fragile'] else "No"}
- **Requires Signature:** {"Yes" if order['requires_signature'] else "No"}
- **Requires Cold Storage:** {"Yes" if order['requires_cold_storage'] else "No"}
## Order Status
- **Status:** {order['status']}
- **Priority:** {order['priority']}
- **Assigned Driver:** {order['assigned_driver_id'] or 'Unassigned'}
## Payment
- **Order Value:** ${order['order_value'] or '0.00'}
- **Payment Status:** {order['payment_status']}
## Special Instructions
{order['special_instructions'] or 'None'}
## Timestamps
- **Created:** {order['created_at']}
- **Updated:** {order['updated_at']}
- **Delivered:** {order['delivered_at'] or 'Not delivered yet'}
"""
return details
except Exception as e:
return f"Error fetching order details: {str(e)}"
# ============================================
# DRIVERS FUNCTIONS
# ============================================
def get_all_drivers(status_filter="all", vehicle_filter="all", search_term=""):
"""Get drivers with filters"""
try:
where_clauses = []
params = []
if status_filter and status_filter != "all":
where_clauses.append("status = %s")
params.append(status_filter)
if vehicle_filter and vehicle_filter != "all":
where_clauses.append("vehicle_type = %s")
params.append(vehicle_filter)
if search_term:
where_clauses.append("(driver_id ILIKE %s OR name ILIKE %s OR phone ILIKE %s OR vehicle_plate ILIKE %s)")
search_pattern = f"%{search_term}%"
params.extend([search_pattern, search_pattern, search_pattern, search_pattern])
where_sql = " WHERE " + " AND ".join(where_clauses) if where_clauses else ""
query = f"""
SELECT
driver_id,
name,
phone,
status,
vehicle_type,
vehicle_plate,
current_lat,
current_lng,
last_location_update
FROM drivers
{where_sql}
ORDER BY name ASC
LIMIT 100
"""
results = execute_query(query, tuple(params) if params else ())
if not results:
return [["-", "-", "-", "-", "-", "-", "-"]]
# Format data for table
data = []
for row in results:
# Format location
if row['current_lat'] and row['current_lng']:
location = f"{row['current_lat']:.4f}, {row['current_lng']:.4f}"
else:
location = "No location"
# Format last update
last_update = row['last_location_update'].strftime("%Y-%m-%d %H:%M") if row['last_location_update'] else "Never"
data.append([
row['driver_id'],
row['name'],
row['phone'] or "N/A",
row['status'],
f"{row['vehicle_type']} - {row['vehicle_plate'] or 'N/A'}",
location,
last_update
])
return data
except Exception as e:
print(f"Error fetching drivers: {e}")
return [[f"Error: {str(e)}", "", "", "", "", "", ""]]
def get_driver_details(driver_id):
"""Get complete driver details"""
if not driver_id or driver_id == "-":
return "Select a driver from the table to view details"
try:
query = """
SELECT * FROM drivers WHERE driver_id = %s
"""
results = execute_query(query, (driver_id,))
if not results:
return f"Driver {driver_id} not found"
driver = results[0]
# Parse skills (handle both list and JSON string)
if driver['skills']:
if isinstance(driver['skills'], list):
skills = driver['skills']
else:
skills = json.loads(driver['skills'])
else:
skills = []
skills_str = ", ".join(skills) if skills else "None"
# Format the details nicely
details = f"""
# Driver Details: {driver_id}
## Personal Information
- **Name:** {driver['name']}
- **Phone:** {driver['phone'] or 'N/A'}
- **Email:** {driver['email'] or 'N/A'}
## Current Location
- **Coordinates:** ({driver['current_lat']}, {driver['current_lng']})
- **Last Update:** {driver['last_location_update'] or 'Never updated'}
## Status
- **Status:** {driver['status']}
## Vehicle Information
- **Type:** {driver['vehicle_type']}
- **Plate:** {driver['vehicle_plate'] or 'N/A'}
- **Capacity (kg):** {driver['capacity_kg'] or 'N/A'}
- **Capacity (mΒ³):** {driver['capacity_m3'] or 'N/A'}
## Skills & Certifications
{skills_str}
## Timestamps
- **Created:** {driver['created_at']}
- **Updated:** {driver['updated_at']}
"""
return details
except Exception as e:
return f"Error fetching driver details: {str(e)}"
def update_order_ui(order_id, **fields):
"""Update order via UI"""
from chat.tools import execute_tool
if not order_id:
return {"success": False, "message": "Order ID is required"}
# Build tool input
tool_input = {"order_id": order_id}
tool_input.update(fields)
# Call update tool
result = execute_tool("update_order", tool_input)
return result
def delete_order_ui(order_id):
"""Delete order via UI"""
from chat.tools import execute_tool
if not order_id:
return {"success": False, "message": "Order ID is required"}
# Call delete tool with confirmation
result = execute_tool("delete_order", {"order_id": order_id, "confirm": True})
return result
def update_driver_ui(driver_id, **fields):
"""Update driver via UI"""
from chat.tools import execute_tool
if not driver_id:
return {"success": False, "message": "Driver ID is required"}
# Build tool input
tool_input = {"driver_id": driver_id}
tool_input.update(fields)
# Call update tool
result = execute_tool("update_driver", tool_input)
return result
def delete_driver_ui(driver_id):
"""Delete driver via UI"""
from chat.tools import execute_tool
if not driver_id:
return {"success": False, "message": "Driver ID is required"}
# Call delete tool with confirmation
result = execute_tool("delete_driver", {"driver_id": driver_id, "confirm": True})
return result
# ============================================
# CHAT FUNCTIONS
# ============================================
def get_api_status():
"""Get API status for chat"""
full_status = chat_engine.get_full_status()
selected = full_status["selected"]
claude_status = full_status["claude"]["status"]
gemini_status = full_status["gemini"]["status"]
geocoding_status = geocoding_service.get_status()
claude_marker = "🎯 **ACTIVE** - " if selected == "anthropic" else ""
gemini_marker = "🎯 **ACTIVE** - " if selected == "gemini" else ""
return f"""**AI Provider:**
**Claude:** {claude_marker}{claude_status}
**Gemini:** {gemini_marker}{gemini_status}
**Geocoding:** {geocoding_status}
"""
def handle_chat_message(message, session_id):
"""Handle chat message from user"""
if session_id not in SESSIONS:
SESSIONS[session_id] = ConversationManager()
welcome = chat_engine.get_welcome_message()
SESSIONS[session_id].add_message("assistant", welcome)
conversation = SESSIONS[session_id]
if not message.strip():
return conversation.get_formatted_history(), conversation.get_tool_calls(), session_id
response, tool_calls = chat_engine.process_message(message, conversation)
return conversation.get_formatted_history(), conversation.get_tool_calls(), session_id
def reset_conversation(session_id):
"""Reset conversation to start fresh"""
new_session_id = str(uuid.uuid4())
new_conversation = ConversationManager()
welcome = chat_engine.get_welcome_message()
new_conversation.add_message("assistant", welcome)
SESSIONS[new_session_id] = new_conversation
return (
new_conversation.get_formatted_history(),
[],
new_session_id
)
def get_initial_chat():
"""Get initial chat state with welcome message"""
session_id = str(uuid.uuid4())
conversation = ConversationManager()
welcome = chat_engine.get_welcome_message()
conversation.add_message("assistant", welcome)
SESSIONS[session_id] = conversation
return conversation.get_formatted_history(), [], session_id
# ============================================
# GRADIO INTERFACE
# ============================================
def create_interface():
"""Create the Gradio interface with 3 enhanced tabs"""
with gr.Blocks(theme=gr.themes.Soft(), title="FleetMind Dispatch System") as app:
gr.Markdown("# 🚚 FleetMind Dispatch System")
gr.Markdown("*AI-Powered Delivery Coordination*")
with gr.Tabs():
# ==========================================
# TAB 1: CHAT
# ==========================================
with gr.Tab("πŸ’¬ Chat Assistant"):
provider_name = chat_engine.get_provider_name()
model_name = chat_engine.get_model_name()
gr.Markdown(f"### AI Dispatch Assistant")
gr.Markdown(f"*Powered by {provider_name} ({model_name})*")
# Quick Action Buttons
gr.Markdown("**Quick Actions:**")
# Row 1: Create and View
with gr.Row():
quick_create_order = gr.Button("πŸ“¦ Create Order", size="sm")
quick_view_orders = gr.Button("πŸ“‹ View Orders", size="sm")
quick_view_drivers = gr.Button("πŸ‘₯ View Drivers", size="sm")
quick_check_status = gr.Button("πŸ“Š Check Status", size="sm")
# Row 2: Order Management
with gr.Row():
quick_update_order = gr.Button("✏️ Update Order", size="sm", variant="secondary")
quick_delete_order = gr.Button("πŸ—‘οΈ Delete Order", size="sm", variant="secondary")
quick_update_driver = gr.Button("✏️ Update Driver", size="sm", variant="secondary")
quick_delete_driver = gr.Button("πŸ—‘οΈ Delete Driver", size="sm", variant="secondary")
# Chat interface
chatbot = gr.Chatbot(
label="Chat with AI Assistant",
height=600,
type="messages",
show_copy_button=True,
avatar_images=("πŸ‘€", "πŸ€–")
)
msg_input = gr.Textbox(
placeholder="Type your message here... (e.g., 'Create order for John at 123 Main St' or 'Show me available drivers')",
label="Your Message",
lines=3
)
with gr.Row():
send_btn = gr.Button("πŸ“€ Send Message", variant="primary", scale=3)
clear_btn = gr.Button("πŸ—‘οΈ Clear Chat", scale=1)
# API Status in accordion
with gr.Accordion("πŸ”§ System Status", open=False):
api_status = gr.Markdown(get_api_status())
# Tool usage display
with gr.Accordion("πŸ› οΈ Tool Usage Log", open=False):
gr.Markdown("*See what tools the AI is using behind the scenes*")
tool_display = gr.JSON(label="Tools Called", value=[])
# Session state
session_id_state = gr.State(value=None)
# Event handlers
def send_message(message, sess_id):
if sess_id is None:
sess_id = str(uuid.uuid4())
SESSIONS[sess_id] = ConversationManager()
welcome = chat_engine.get_welcome_message()
SESSIONS[sess_id].add_message("assistant", welcome)
chat_history, tools, new_sess_id = handle_chat_message(message, sess_id)
return chat_history, tools, new_sess_id, ""
# Quick action functions
def quick_action(prompt):
return prompt
quick_create_order.click(
fn=lambda: "Create a new order",
outputs=msg_input
)
quick_view_orders.click(
fn=lambda: "Show me all orders",
outputs=msg_input
)
quick_view_drivers.click(
fn=lambda: "Show me all available drivers",
outputs=msg_input
)
quick_check_status.click(
fn=lambda: "What is the current status of orders and drivers?",
outputs=msg_input
)
quick_update_order.click(
fn=lambda: "Update order [ORDER_ID] - change status to [STATUS]",
outputs=msg_input
)
quick_delete_order.click(
fn=lambda: "Delete order [ORDER_ID]",
outputs=msg_input
)
quick_update_driver.click(
fn=lambda: "Update driver [DRIVER_ID] - change status to [STATUS]",
outputs=msg_input
)
quick_delete_driver.click(
fn=lambda: "Delete driver [DRIVER_ID]",
outputs=msg_input
)
send_btn.click(
fn=send_message,
inputs=[msg_input, session_id_state],
outputs=[chatbot, tool_display, session_id_state, msg_input]
)
msg_input.submit(
fn=send_message,
inputs=[msg_input, session_id_state],
outputs=[chatbot, tool_display, session_id_state, msg_input]
)
clear_btn.click(
fn=reset_conversation,
inputs=[session_id_state],
outputs=[chatbot, tool_display, session_id_state]
)
# ==========================================
# TAB 2: ORDERS
# ==========================================
with gr.Tab("πŸ“¦ Orders Management"):
gr.Markdown("### Orders Dashboard")
# Statistics Cards
def update_order_stats():
stats = get_orders_stats()
return (
f"**Total:** {stats['total']}",
f"**Pending:** {stats['pending']}",
f"**In Transit:** {stats['in_transit']}",
f"**Delivered:** {stats['delivered']}"
)
with gr.Row():
stat_total = gr.Markdown("**Total:** 0")
stat_pending = gr.Markdown("**Pending:** 0")
stat_transit = gr.Markdown("**In Transit:** 0")
stat_delivered = gr.Markdown("**Delivered:** 0")
gr.Markdown("---")
# Filters
gr.Markdown("**Filters:**")
with gr.Row():
status_filter = gr.Dropdown(
choices=["all", "pending", "assigned", "in_transit", "delivered", "failed", "cancelled"],
value="all",
label="Status",
scale=1
)
priority_filter = gr.Dropdown(
choices=["all", "standard", "express", "urgent"],
value="all",
label="Priority",
scale=1
)
payment_filter = gr.Dropdown(
choices=["all", "pending", "paid", "cod"],
value="all",
label="Payment",
scale=1
)
search_orders = gr.Textbox(
placeholder="Search by Order ID, Customer, Phone...",
label="Search",
scale=2
)
with gr.Row():
apply_filters_btn = gr.Button("πŸ” Apply Filters", variant="primary", scale=1)
refresh_orders_btn = gr.Button("πŸ”„ Refresh", scale=1)
# Orders Table
orders_table = gr.Dataframe(
headers=["Order ID", "Customer", "Address", "Status", "Priority", "Driver", "Deadline"],
datatype=["str", "str", "str", "str", "str", "str", "str"],
label="Orders List (Click row to view details)",
value=get_all_orders(),
interactive=False,
wrap=True
)
# Order Details
gr.Markdown("---")
gr.Markdown("**Order Details:**")
order_details = gr.Markdown("*Select an order from the table above to view full details*")
# Edit/Delete Actions
gr.Markdown("---")
gr.Markdown("**Order Actions:**")
with gr.Row():
selected_order_id_edit = gr.Textbox(label="Order ID to Edit", placeholder="ORD-XXXXXXXX", scale=2)
edit_order_btn = gr.Button("✏️ Edit Order", variant="secondary", scale=1)
delete_order_btn = gr.Button("πŸ—‘οΈ Delete Order", variant="stop", scale=1)
# Edit Form (in accordion)
with gr.Accordion("Edit Order Form", open=False) as edit_accordion:
with gr.Row():
edit_customer_name = gr.Textbox(label="Customer Name")
edit_customer_phone = gr.Textbox(label="Customer Phone")
with gr.Row():
edit_status = gr.Dropdown(
choices=["pending", "assigned", "in_transit", "delivered", "failed", "cancelled"],
label="Status"
)
edit_priority = gr.Dropdown(
choices=["standard", "express", "urgent"],
label="Priority"
)
with gr.Row():
edit_payment_status = gr.Dropdown(
choices=["pending", "paid", "cod"],
label="Payment Status"
)
edit_weight_kg = gr.Number(label="Weight (kg)")
edit_special_instructions = gr.Textbox(label="Special Instructions", lines=2)
save_order_btn = gr.Button("πŸ’Ύ Save Changes", variant="primary")
# Action Results
action_result = gr.Markdown("")
# Event handlers
def filter_and_update_orders(status, priority, payment, search):
return get_all_orders(status, priority, payment, search)
def refresh_orders_and_stats(status, priority, payment, search):
stats = update_order_stats()
table = get_all_orders(status, priority, payment, search)
return stats[0], stats[1], stats[2], stats[3], table
def show_order_details(evt: gr.SelectData, table_data):
try:
# table_data is a pandas DataFrame from Gradio
if hasattr(table_data, 'iloc'):
# DataFrame - use iloc to access row and column
order_id = table_data.iloc[evt.index[0], 0]
else:
# List of lists - use standard indexing
order_id = table_data[evt.index[0]][0]
return get_order_details(order_id)
except Exception as e:
return f"Error: {str(e)}"
apply_filters_btn.click(
fn=filter_and_update_orders,
inputs=[status_filter, priority_filter, payment_filter, search_orders],
outputs=orders_table
)
refresh_orders_btn.click(
fn=refresh_orders_and_stats,
inputs=[status_filter, priority_filter, payment_filter, search_orders],
outputs=[stat_total, stat_pending, stat_transit, stat_delivered, orders_table]
)
orders_table.select(
fn=show_order_details,
inputs=[orders_table],
outputs=order_details
)
# Edit/Delete handlers
def handle_edit_order(order_id):
if not order_id:
return "Please enter an Order ID"
# Fetch order details and populate form
query = "SELECT * FROM orders WHERE order_id = %s"
from database.connection import execute_query
results = execute_query(query, (order_id,))
if not results:
return "Order not found"
order = results[0]
return (
order.get('customer_name', ''),
order.get('customer_phone', ''),
order.get('status', ''),
order.get('priority', ''),
order.get('payment_status', ''),
order.get('weight_kg', 0),
order.get('special_instructions', ''),
"Order loaded. Update fields and click Save Changes."
)
def handle_save_order(order_id, name, phone, status, priority, payment, weight, instructions, status_f, priority_f, payment_f, search):
if not order_id:
return "Please enter an Order ID", get_all_orders(status_f, priority_f, payment_f, search)
fields = {}
if name:
fields['customer_name'] = name
if phone:
fields['customer_phone'] = phone
if status:
fields['status'] = status
if priority:
fields['priority'] = priority
if payment:
fields['payment_status'] = payment
if weight:
fields['weight_kg'] = float(weight)
if instructions:
fields['special_instructions'] = instructions
result = update_order_ui(order_id, **fields)
# Refresh table
refreshed_table = get_all_orders(status_f, priority_f, payment_f, search)
if result['success']:
return f"βœ… {result['message']}", refreshed_table
else:
return f"❌ {result.get('error', 'Update failed')}", refreshed_table
def handle_delete_order(order_id, status_f, priority_f, payment_f, search):
if not order_id:
return "Please enter an Order ID", get_all_orders(status_f, priority_f, payment_f, search)
result = delete_order_ui(order_id)
# Refresh table
refreshed_table = get_all_orders(status_f, priority_f, payment_f, search)
if result['success']:
return f"βœ… {result['message']}", refreshed_table
else:
return f"❌ {result.get('error', 'Deletion failed')}", refreshed_table
edit_order_btn.click(
fn=handle_edit_order,
inputs=[selected_order_id_edit],
outputs=[edit_customer_name, edit_customer_phone, edit_status, edit_priority,
edit_payment_status, edit_weight_kg, edit_special_instructions, action_result]
)
save_order_btn.click(
fn=handle_save_order,
inputs=[selected_order_id_edit, edit_customer_name, edit_customer_phone, edit_status,
edit_priority, edit_payment_status, edit_weight_kg, edit_special_instructions,
status_filter, priority_filter, payment_filter, search_orders],
outputs=[action_result, orders_table]
)
delete_order_btn.click(
fn=handle_delete_order,
inputs=[selected_order_id_edit, status_filter, priority_filter, payment_filter, search_orders],
outputs=[action_result, orders_table]
)
# ==========================================
# TAB 3: DRIVERS
# ==========================================
with gr.Tab("πŸ‘₯ Drivers Management"):
gr.Markdown("### Drivers Dashboard")
# Statistics Cards
def update_driver_stats():
stats = get_drivers_stats()
return (
f"**Total:** {stats['total']}",
f"**Active:** {stats['active']}",
f"**Busy:** {stats['busy']}",
f"**Offline:** {stats['offline']}"
)
with gr.Row():
driver_stat_total = gr.Markdown("**Total:** 0")
driver_stat_active = gr.Markdown("**Active:** 0")
driver_stat_busy = gr.Markdown("**Busy:** 0")
driver_stat_offline = gr.Markdown("**Offline:** 0")
gr.Markdown("---")
# Filters
gr.Markdown("**Filters:**")
with gr.Row():
driver_status_filter = gr.Dropdown(
choices=["all", "active", "busy", "offline", "unavailable"],
value="all",
label="Status",
scale=1
)
vehicle_filter = gr.Dropdown(
choices=["all", "van", "truck", "car", "motorcycle"],
value="all",
label="Vehicle Type",
scale=1
)
search_drivers = gr.Textbox(
placeholder="Search by Driver ID, Name, Phone, Plate...",
label="Search",
scale=2
)
with gr.Row():
apply_driver_filters_btn = gr.Button("πŸ” Apply Filters", variant="primary", scale=1)
refresh_drivers_btn = gr.Button("πŸ”„ Refresh", scale=1)
# Drivers Table
drivers_table = gr.Dataframe(
headers=["Driver ID", "Name", "Phone", "Status", "Vehicle", "Location", "Last Update"],
datatype=["str", "str", "str", "str", "str", "str", "str"],
label="Drivers List (Click row to view details)",
value=get_all_drivers(),
interactive=False,
wrap=True
)
# Driver Details
gr.Markdown("---")
gr.Markdown("**Driver Details:**")
driver_details = gr.Markdown("*Select a driver from the table above to view full details*")
# Edit/Delete Actions
gr.Markdown("---")
gr.Markdown("**Driver Actions:**")
with gr.Row():
selected_driver_id_edit = gr.Textbox(label="Driver ID to Edit", placeholder="DRV-XXXXXXXX", scale=2)
edit_driver_btn = gr.Button("✏️ Edit Driver", variant="secondary", scale=1)
delete_driver_btn = gr.Button("πŸ—‘οΈ Delete Driver", variant="stop", scale=1)
# Edit Form (in accordion)
with gr.Accordion("Edit Driver Form", open=False) as driver_edit_accordion:
with gr.Row():
edit_driver_name = gr.Textbox(label="Driver Name")
edit_driver_phone = gr.Textbox(label="Phone")
with gr.Row():
edit_driver_email = gr.Textbox(label="Email")
edit_driver_status = gr.Dropdown(
choices=["active", "busy", "offline", "unavailable"],
label="Status"
)
with gr.Row():
edit_vehicle_type = gr.Textbox(label="Vehicle Type")
edit_vehicle_plate = gr.Textbox(label="Vehicle Plate")
with gr.Row():
edit_capacity_kg = gr.Number(label="Capacity (kg)")
edit_capacity_m3 = gr.Number(label="Capacity (mΒ³)")
save_driver_btn = gr.Button("πŸ’Ύ Save Changes", variant="primary")
# Action Results
driver_action_result = gr.Markdown("")
# Event handlers
def filter_and_update_drivers(status, vehicle, search):
return get_all_drivers(status, vehicle, search)
def refresh_drivers_and_stats(status, vehicle, search):
stats = update_driver_stats()
table = get_all_drivers(status, vehicle, search)
return stats[0], stats[1], stats[2], stats[3], table
def show_driver_details(evt: gr.SelectData, table_data):
try:
# table_data is a pandas DataFrame from Gradio
if hasattr(table_data, 'iloc'):
# DataFrame - use iloc to access row and column
driver_id = table_data.iloc[evt.index[0], 0]
else:
# List of lists - use standard indexing
driver_id = table_data[evt.index[0]][0]
return get_driver_details(driver_id)
except Exception as e:
return f"Error: {str(e)}"
apply_driver_filters_btn.click(
fn=filter_and_update_drivers,
inputs=[driver_status_filter, vehicle_filter, search_drivers],
outputs=drivers_table
)
refresh_drivers_btn.click(
fn=refresh_drivers_and_stats,
inputs=[driver_status_filter, vehicle_filter, search_drivers],
outputs=[driver_stat_total, driver_stat_active, driver_stat_busy, driver_stat_offline, drivers_table]
)
drivers_table.select(
fn=show_driver_details,
inputs=[drivers_table],
outputs=driver_details
)
# Edit/Delete handlers
def handle_edit_driver(driver_id):
if not driver_id:
return "Please enter a Driver ID"
# Fetch driver details and populate form
query = "SELECT * FROM drivers WHERE driver_id = %s"
from database.connection import execute_query
results = execute_query(query, (driver_id,))
if not results:
return "Driver not found"
driver = results[0]
return (
driver.get('name', ''),
driver.get('phone', ''),
driver.get('email', ''),
driver.get('status', ''),
driver.get('vehicle_type', ''),
driver.get('vehicle_plate', ''),
driver.get('capacity_kg', 0),
driver.get('capacity_m3', 0),
"Driver loaded. Update fields and click Save Changes."
)
def handle_save_driver(driver_id, name, phone, email, status, vehicle_type, vehicle_plate, capacity_kg, capacity_m3, status_f, vehicle_f, search):
if not driver_id:
return "Please enter a Driver ID", get_all_drivers(status_f, vehicle_f, search)
fields = {}
if name:
fields['name'] = name
if phone:
fields['phone'] = phone
if email:
fields['email'] = email
if status:
fields['status'] = status
if vehicle_type:
fields['vehicle_type'] = vehicle_type
if vehicle_plate:
fields['vehicle_plate'] = vehicle_plate
if capacity_kg:
fields['capacity_kg'] = float(capacity_kg)
if capacity_m3:
fields['capacity_m3'] = float(capacity_m3)
result = update_driver_ui(driver_id, **fields)
# Refresh table
refreshed_table = get_all_drivers(status_f, vehicle_f, search)
if result['success']:
return f"βœ… {result['message']}", refreshed_table
else:
return f"❌ {result.get('error', 'Update failed')}", refreshed_table
def handle_delete_driver(driver_id, status_f, vehicle_f, search):
if not driver_id:
return "Please enter a Driver ID", get_all_drivers(status_f, vehicle_f, search)
result = delete_driver_ui(driver_id)
# Refresh table
refreshed_table = get_all_drivers(status_f, vehicle_f, search)
if result['success']:
return f"βœ… {result['message']}", refreshed_table
else:
return f"❌ {result.get('error', 'Deletion failed')}", refreshed_table
edit_driver_btn.click(
fn=handle_edit_driver,
inputs=[selected_driver_id_edit],
outputs=[edit_driver_name, edit_driver_phone, edit_driver_email, edit_driver_status,
edit_vehicle_type, edit_vehicle_plate, edit_capacity_kg, edit_capacity_m3, driver_action_result]
)
save_driver_btn.click(
fn=handle_save_driver,
inputs=[selected_driver_id_edit, edit_driver_name, edit_driver_phone, edit_driver_email,
edit_driver_status, edit_vehicle_type, edit_vehicle_plate, edit_capacity_kg, edit_capacity_m3,
driver_status_filter, vehicle_filter, search_drivers],
outputs=[driver_action_result, drivers_table]
)
delete_driver_btn.click(
fn=handle_delete_driver,
inputs=[selected_driver_id_edit, driver_status_filter, vehicle_filter, search_drivers],
outputs=[driver_action_result, drivers_table]
)
gr.Markdown("---")
gr.Markdown("*FleetMind v1.0 - AI-Powered Dispatch Coordination*")
# Initialize chat and stats on load
app.load(
fn=get_initial_chat,
outputs=[chatbot, tool_display, session_id_state]
).then(
fn=update_order_stats,
outputs=[stat_total, stat_pending, stat_transit, stat_delivered]
).then(
fn=update_driver_stats,
outputs=[driver_stat_total, driver_stat_active, driver_stat_busy, driver_stat_offline]
)
return app
# ============================================
# MAIN
# ============================================
if __name__ == "__main__":
print("=" * 60)
print("FleetMind - Starting Enhanced UI")
print("=" * 60)
print("\nChecking database connection...")
if test_connection():
print("βœ… Database connected")
else:
print("❌ Database connection failed")
print("\nStarting Gradio interface...")
print("=" * 60)
app = create_interface()
app.launch(
server_name="0.0.0.0",
server_port=7860,
share=False,
show_error=True,
show_api=False
)