""" Llama + FAISS RAG System for Fire Evacuation with Advanced Reasoning This module implements a RAG (Retrieval-Augmented Generation) system for fire evacuation scenarios with advanced LLM reasoning techniques including: 1. Chain-of-Thought (CoT) Prompting: - Enables step-by-step reasoning through intermediate steps - Improves complex problem-solving capabilities - Reference: https://arxiv.org/pdf/2201.11903 2. Tree-of-Thoughts (ToT): - Maintains multiple reasoning paths - Self-evaluates progress through intermediate thoughts - Enables deliberate reasoning process - Reference: https://arxiv.org/pdf/2305.10601 3. Reflexion: - Reinforces language-based agents through linguistic feedback - Self-reflection and iterative improvement - Reference: https://arxiv.org/pdf/2303.11366 4. CoT with Tools: - Combines CoT prompting with external tools - Interleaved reasoning and tool usage - Reference: https://arxiv.org/pdf/2303.09014 5. Advanced Decoding Strategies: - Greedy: Deterministic highest probability - Sampling: Random sampling with temperature - Beam Search: Explores multiple paths - Nucleus (Top-p): Samples from top-p probability mass - Temperature: Temperature-based sampling Downloads Llama model, creates JSON dataset, builds FAISS index, and provides RAG querying """ import unsloth import json import os import pickle import glob import re from typing import List, Dict, Any, Optional, Tuple from pathlib import Path from enum import Enum import copy import numpy as np import faiss import torch from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline from sentence_transformers import SentenceTransformer import gradio as gr # Project imports (use helper_files package) from floor_plan import create_sample_floor_plan, FloorPlan from sensor_system import create_sample_fire_scenario, SensorSystem from pathfinding import PathFinder class FireEvacuationDataExporter: """Exports fire evacuation system data to JSON format""" def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem, pathfinder: PathFinder): self.floor_plan = floor_plan self.sensor_system = sensor_system self.pathfinder = pathfinder def export_room_data(self, room_id: str) -> Dict[str, Any]: """Export comprehensive room data to JSON""" room = self.floor_plan.get_room(room_id) sensor = self.sensor_system.get_sensor_reading(room_id) if not room or not sensor: return {} return { "room_id": room_id, "name": room.name, "room_type": room.room_type, "position": room.position, "size": room.size, "has_oxygen_cylinder": room.has_oxygen_cylinder, "has_fire_extinguisher": room.has_fire_extinguisher, "connected_to": [conn[0] for conn in room.connected_to], "sensor_data": { "fire_detected": sensor.fire_detected, "smoke_level": round(sensor.smoke_level, 2), "temperature_c": round(sensor.temperature, 1), "oxygen_pct": round(sensor.oxygen_level, 1), "visibility_pct": round(sensor.visibility, 1), "structural_integrity_pct": round(sensor.structural_integrity, 1), "fire_growth_rate": round(sensor.fire_growth_rate, 2), "flashover_risk": round(sensor.flashover_risk, 2), "backdraft_risk": round(sensor.backdraft_risk, 2), "heat_radiation": round(sensor.heat_radiation, 2), "fire_type": sensor.fire_type, "carbon_monoxide_ppm": round(sensor.carbon_monoxide, 1), "carbon_dioxide_ppm": round(sensor.carbon_dioxide, 1), "hydrogen_cyanide_ppm": round(sensor.hydrogen_cyanide, 2), "hydrogen_chloride_ppm": round(sensor.hydrogen_chloride, 2), "wind_direction": round(sensor.wind_direction, 1), "wind_speed": round(sensor.wind_speed, 2), "air_pressure": round(sensor.air_pressure, 2), "humidity": round(sensor.humidity, 1), "occupancy_density": round(sensor.occupancy_density, 2), "mobility_limitations": sensor.mobility_limitations, "panic_level": round(sensor.panic_level, 2), "evacuation_progress": round(sensor.evacuation_progress, 1), "sprinkler_active": sensor.sprinkler_active, "emergency_lighting": sensor.emergency_lighting, "elevator_available": sensor.elevator_available, "stairwell_clear": sensor.stairwell_clear, "exit_accessible": sensor.exit_accessible, "exit_capacity": sensor.exit_capacity, "ventilation_active": sensor.ventilation_active, "time_since_fire_start": sensor.time_since_fire_start, "estimated_time_to_exit": sensor.estimated_time_to_exit, "emergency_comm_working": sensor.emergency_comm_working, "wifi_signal_strength": round(sensor.wifi_signal_strength, 1), "danger_score": round(sensor.calculate_danger_score(), 1), "passable": sensor.is_passable() } } def export_route_data(self, start_location: str = "R1") -> Dict[str, Any]: """Export all evacuation routes with detailed information""" routes = self.pathfinder.find_all_evacuation_routes(start_location) route_data = { "timestamp_sec": 0, "start_location": start_location, "total_routes": len(routes), "routes": [] } for idx, (exit_id, path, risk) in enumerate(routes, 1): route_info = { "route_id": f"Route {idx}", "exit": exit_id, "path": path, "metrics": { "avg_danger": round(risk['avg_danger'], 2), "max_danger": round(risk['max_danger'], 2), "max_danger_location": risk['max_danger_location'], "total_danger": round(risk['total_danger'], 2), "path_length": risk['path_length'], "has_fire": risk['has_fire'], "has_oxygen_hazard": risk['has_oxygen_hazard'], "passable": risk['passable'], "risk_factors": risk['risk_factors'] }, "nodes": [] } # Add detailed node information for room_id in path: node_data = self.export_room_data(room_id) if node_data: route_info["nodes"].append(node_data) route_data["routes"].append(route_info) return route_data def export_all_rooms(self) -> List[Dict[str, Any]]: """Export all rooms as separate documents""" all_rooms = [] for room_id in self.floor_plan.rooms: room_data = self.export_room_data(room_id) if room_data: all_rooms.append(room_data) return all_rooms def export_to_json(self, output_path: str, start_location: str = "R1"): """Export complete dataset to JSON file""" data = { "floor_plan": { "floor_name": self.floor_plan.floor_name, "total_rooms": len(self.floor_plan.rooms), "exits": self.floor_plan.exits }, "all_rooms": self.export_all_rooms(), "evacuation_routes": self.export_route_data(start_location) } with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, indent=2, ensure_ascii=False) print(f"[OK] Exported data to {output_path}") return data class ReasoningMode(Enum): """Enumeration of reasoning modes""" STANDARD = "standard" CHAIN_OF_THOUGHT = "chain_of_thought" TREE_OF_THOUGHTS = "tree_of_thoughts" REFLEXION = "reflexion" COT_WITH_TOOLS = "cot_with_tools" class DecodingStrategy(Enum): """Enumeration of decoding strategies""" GREEDY = "greedy" SAMPLING = "sampling" BEAM_SEARCH = "beam_search" NUCLEUS = "nucleus" TEMPERATURE = "temperature" class FireEvacuationRAG: """RAG system using FAISS for retrieval and Llama for generation with advanced reasoning""" def __init__(self, model_name: str = "nvidia/Llama-3.1-Minitron-4B-Width-Base", model_dir: str = "./models", use_8bit: bool = False, use_unsloth: bool = False, load_in_4bit: bool = True, max_seq_length: int = 2048, reasoning_mode: ReasoningMode = ReasoningMode.CHAIN_OF_THOUGHT, decoding_strategy: DecodingStrategy = DecodingStrategy.NUCLEUS): self.model_name = model_name self.model_dir = model_dir self.local_model_path = os.path.join(model_dir, model_name.replace("/", "_")) self.use_8bit = use_8bit self.use_unsloth = use_unsloth self.load_in_4bit = load_in_4bit self.max_seq_length = max_seq_length self.reasoning_mode = reasoning_mode self.decoding_strategy = decoding_strategy self.tokenizer = None self.model = None self.pipe = None self.embedder = None self.index = None self.documents = [] self.metadata = [] self.reflexion_history = [] # Store reflection history for Reflexion # Create model directory if it doesn't exist os.makedirs(self.model_dir, exist_ok=True) os.makedirs(self.local_model_path, exist_ok=True) print(f"Initializing RAG system with model: {model_name}") print(f"Model will be saved to: {self.local_model_path}") print(f"Reasoning mode: {reasoning_mode.value}") print(f"Decoding strategy: {decoding_strategy.value}") if use_unsloth: print("[*] Unsloth enabled (faster loading and inference)") if load_in_4bit: print(" - 4-bit quantization enabled (very fast, low memory)") elif use_8bit: print("[!] 8-bit quantization enabled (faster loading, lower memory, slight quality trade-off)") def _check_model_files_exist(self, model_path: str) -> bool: """Check if model files actually exist (not just config.json)""" required_files = [ "config.json", "model.safetensors.index.json" # Check for sharded model index ] # Check for at least one model file model_file_patterns = [ "model.safetensors", "pytorch_model.bin", "model-*.safetensors" # Sharded models ] config_exists = os.path.exists(os.path.join(model_path, "config.json")) if not config_exists: return False # Check for model weight files for pattern in model_file_patterns: if glob.glob(os.path.join(model_path, pattern)): return True # Check for sharded model index if os.path.exists(os.path.join(model_path, "model.safetensors.index.json")): return True return False def download_model(self): """Download and load the Llama model, saving weights to local directory""" print("Downloading Llama model (this may take a while)...") print(f"Model weights will be saved to: {self.local_model_path}") # Use Unsloth if enabled (much faster loading) - PRIMARY METHOD if self.use_unsloth: try: from unsloth import FastLanguageModel from transformers import TextStreamer print("[*] Using Unsloth for fast model loading...") # Check if model name indicates it's already quantized (contains "bnb-4bit" or "bnb-8bit") is_pre_quantized = "bnb-4bit" in self.model_name.lower() or "bnb-8bit" in self.model_name.lower() # For pre-quantized models, don't set load_in_4bit (model is already quantized) # For non-quantized models, check if bitsandbytes is available if self.load_in_4bit and not is_pre_quantized: try: import bitsandbytes print("[OK] bitsandbytes available for 4-bit quantization") except ImportError: print("[!] bitsandbytes not found. 4-bit quantization requires bitsandbytes.") print(" Install with: pip install bitsandbytes") print(" Falling back to full precision...") self.load_in_4bit = False # Check if model exists locally if self._check_model_files_exist(self.local_model_path): print(f"Loading from local path: {self.local_model_path}") model_path = self.local_model_path else: print(f"Downloading model: {self.model_name}") model_path = self.model_name # ==== Load Model with Unsloth (exact pattern from user) ==== dtype = None # Auto-detect dtype # Try loading with proper error handling for bitsandbytes # The model config might have quantization settings that trigger bitsandbytes check max_retries = 2 for attempt in range(max_retries): try: # For pre-quantized models, don't specify load_in_4bit (it's already quantized) if is_pre_quantized or attempt > 0: print("[OK] Loading model without quantization parameters...") # Don't pass any quantization parameters load_kwargs = { "model_name": model_path, "max_seq_length": self.max_seq_length, "dtype": dtype, } else: # For non-quantized models, try quantization if requested load_kwargs = { "model_name": model_path, "max_seq_length": self.max_seq_length, "dtype": dtype, } if self.load_in_4bit: load_kwargs["load_in_4bit"] = True self.model, self.tokenizer = FastLanguageModel.from_pretrained(**load_kwargs) break # Success, exit retry loop except (ImportError, Exception) as quant_error: error_str = str(quant_error) is_bitsandbytes_error = ( "bitsandbytes" in error_str.lower() or "PackageNotFoundError" in error_str or "No package metadata" in error_str or "quantization_config" in error_str.lower() ) if is_bitsandbytes_error and attempt < max_retries - 1: print(f"[!] Attempt {attempt + 1}: bitsandbytes error detected.") print(f" Error: {error_str[:150]}...") print(" Retrying without quantization parameters...") continue # Retry without quantization elif is_bitsandbytes_error: print("[!] bitsandbytes required but not installed.") print(" Options:") print(" 1. Install bitsandbytes: pip install bitsandbytes") print(" 2. Use a non-quantized model") print(" 3. Set USE_UNSLOTH=False to use standard loading") raise ImportError( "bitsandbytes is required for this model. " "Install with: pip install bitsandbytes" ) from quant_error else: # Re-raise if it's a different error raise # Optimize for inference FastLanguageModel.for_inference(self.model) print("[OK] Model loaded successfully with Unsloth!") # Verify device if torch.cuda.is_available(): actual_device = next(self.model.parameters()).device print(f"[OK] Model loaded on {actual_device}!") allocated = torch.cuda.memory_allocated(0) / 1024**3 print(f"[OK] GPU Memory allocated: {allocated:.2f} GB") else: print("[OK] Model loaded on CPU!") # Set pipe to model for compatibility (we'll use model directly in generation) self.pipe = self.model # Store model reference for compatibility checks return # Exit early, Unsloth loading complete except ImportError: print("[!] Unsloth not installed. Falling back to standard loading.") print(" Install with: pip install unsloth") self.use_unsloth = False # Disable unsloth for this session except Exception as e: print(f"[!] Unsloth loading failed: {e}") print(" Falling back to standard loading...") self.use_unsloth = False # Standard loading (original code) # Check GPU availability and optimize settings device = "cuda" if torch.cuda.is_available() else "cpu" if device == "cuda": gpu_name = torch.cuda.get_device_name(0) gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3 print(f"[OK] GPU detected: {gpu_name}") print(f"[OK] GPU Memory: {gpu_memory:.2f} GB") # Use bfloat16 for faster loading and inference on GPU torch_dtype = torch.bfloat16 print("[OK] Using bfloat16 precision for faster loading") else: print("[!] No GPU detected, using CPU (will be slower)") torch_dtype = torch.float32 print("[OK] Using float32 precision for CPU") # Check for optimized attention implementation try: import flash_attn # noqa: F401 attn_impl = 'flash_attention_2' print("[OK] FlashAttention2 available - using for optimal performance") except ImportError: attn_impl = 'sdpa' # Scaled Dot Product Attention (built into PyTorch) print("[OK] Using SDPA (Scaled Dot Product Attention) for faster inference") # Check for 8-bit quantization support use_quantization = False if self.use_8bit and device == "cuda": try: import bitsandbytes use_quantization = True print("[OK] 8-bit quantization available - will use for faster loading") except ImportError: print("[!] 8-bit requested but bitsandbytes not installed, using full precision") print(" Install with: pip install bitsandbytes") try: # Check if model already exists locally with actual model files if self._check_model_files_exist(self.local_model_path): print(f"Found existing model at {self.local_model_path}, loading from local...") model_path = self.local_model_path load_from_local = True else: print("Downloading model from HuggingFace...") model_path = self.model_name load_from_local = False # Load tokenizer print("Loading tokenizer...") self.tokenizer = AutoTokenizer.from_pretrained( model_path, trust_remote_code=True ) # Save tokenizer locally if downloaded (wrap in try-except to avoid crashes) if not load_from_local: try: print("Saving tokenizer to local directory...") self.tokenizer.save_pretrained(self.local_model_path) print(f"[OK] Tokenizer saved to {self.local_model_path}") except Exception as save_err: print(f"[!] Warning: Could not save tokenizer locally: {save_err}") print("Continuing without local save...") # Load model with optimizations print("Loading model with optimizations...") load_kwargs = { "trust_remote_code": True, "low_cpu_mem_usage": True, # Reduces memory usage during loading "_attn_implementation": attn_impl, # Optimized attention } # Add quantization or dtype if use_quantization: from transformers import BitsAndBytesConfig load_kwargs["quantization_config"] = BitsAndBytesConfig( load_in_8bit=True, llm_int8_threshold=6.0 ) print("[OK] Using 8-bit quantization for faster loading and lower memory") else: load_kwargs["torch_dtype"] = torch_dtype # Use device_map="auto" for GPU, manual placement for CPU if device == "cuda": try: load_kwargs["device_map"] = "auto" print("[OK] Using device_map='auto' for optimal GPU memory management") except Exception as e: print(f"[!] device_map='auto' failed, using manual GPU placement: {e}") load_kwargs.pop("device_map", None) self.model = AutoModelForCausalLM.from_pretrained( model_path, **load_kwargs ) # Manual device placement if device_map wasn't used if device == "cuda" and "device_map" not in load_kwargs: self.model = self.model.cuda() print("[OK] Model moved to GPU") # Save model locally if downloaded (wrap in try-except to handle DTensor errors) if not load_from_local: try: print("Saving model weights to local directory (this may take a while)...") self.model.save_pretrained( self.local_model_path, safe_serialization=True # Use safetensors format ) print(f"[OK] Model saved to {self.local_model_path}") except ImportError as import_err: if "DTensor" in str(import_err): print(f"[!] Warning: Could not save model due to PyTorch/transformers compatibility issue: {import_err}") print("This is a known issue with certain versions. Model will work but won't be saved locally.") print("Continuing without local save...") else: raise except Exception as save_err: print(f"[!] Warning: Could not save model locally: {save_err}") print("Continuing without local save...") # Create pipeline with optimizations print("Creating pipeline...") pipeline_kwargs = { "model": self.model, "tokenizer": self.tokenizer, } if device == "cuda": pipeline_kwargs["device_map"] = "auto" self.pipe = pipeline("text-generation", **pipeline_kwargs) # Verify model device if device == "cuda": actual_device = next(self.model.parameters()).device print(f"[OK] Model loaded successfully on {actual_device}!") if torch.cuda.is_available(): allocated = torch.cuda.memory_allocated(0) / 1024**3 print(f"[OK] GPU Memory allocated: {allocated:.2f} GB") else: print("[OK] Model loaded successfully on CPU!") except Exception as e: print(f"Error loading model: {e}") print("Falling back to pipeline-only loading...") try: # Determine device and dtype for fallback device = "cuda" if torch.cuda.is_available() else "cpu" torch_dtype = torch.bfloat16 if device == "cuda" else torch.float32 # Try loading from local path first (only if model files actually exist) if self._check_model_files_exist(self.local_model_path): print(f"Attempting to load from local path: {self.local_model_path}") pipeline_kwargs = { "model": self.local_model_path, "trust_remote_code": True, "torch_dtype": torch_dtype, } if device == "cuda": pipeline_kwargs["device_map"] = "auto" self.pipe = pipeline("text-generation", **pipeline_kwargs) # Extract tokenizer from pipeline if available if hasattr(self.pipe, 'tokenizer'): self.tokenizer = self.pipe.tokenizer else: print(f"Downloading model: {self.model_name}") pipeline_kwargs = { "model": self.model_name, "trust_remote_code": True, "torch_dtype": torch_dtype, } if device == "cuda": pipeline_kwargs["device_map"] = "auto" self.pipe = pipeline("text-generation", **pipeline_kwargs) # Extract tokenizer from pipeline if available if hasattr(self.pipe, 'tokenizer'): self.tokenizer = self.pipe.tokenizer # Try to save after loading (but don't fail if it doesn't work) try: if hasattr(self.pipe, 'model') and hasattr(self.pipe.model, 'save_pretrained'): print("Attempting to save downloaded model to local directory...") self.pipe.model.save_pretrained(self.local_model_path, safe_serialization=True) if hasattr(self.pipe, 'tokenizer'): self.pipe.tokenizer.save_pretrained(self.local_model_path) print("[OK] Model saved successfully") except ImportError as import_err: if "DTensor" in str(import_err): print(f"[!] Warning: Could not save model due to compatibility issue. Model will work but won't be saved locally.") else: print(f"[!] Warning: Could not save model: {import_err}") except Exception as save_err: print(f"[!] Warning: Could not save model locally: {save_err}") except Exception as e2: print(f"Pipeline loading also failed: {e2}") raise def load_embedder(self, model_name: str = "all-MiniLM-L6-v2"): """Load sentence transformer for embeddings, saving to local directory""" embedder_dir = os.path.join(self.model_dir, "embedder", model_name.replace("/", "_")) os.makedirs(embedder_dir, exist_ok=True) print(f"Loading embedding model: {model_name}...") print(f"Embedder will be cached in: {embedder_dir}") # Check if embedder exists locally (check for actual model files, not just config) config_path = os.path.join(embedder_dir, "config.json") has_model_files = False if os.path.exists(config_path): # Check if model files exist model_files = glob.glob(os.path.join(embedder_dir, "*.safetensors")) + \ glob.glob(os.path.join(embedder_dir, "pytorch_model.bin")) if model_files or os.path.exists(os.path.join(embedder_dir, "model.safetensors.index.json")): has_model_files = True if has_model_files: print(f"Loading embedder from local cache: {embedder_dir}") self.embedder = SentenceTransformer(embedder_dir) else: print("Downloading embedder from HuggingFace...") self.embedder = SentenceTransformer(model_name, cache_folder=embedder_dir) # Try to save to local directory (but don't fail if it doesn't work) try: self.embedder.save(embedder_dir) print(f"[OK] Embedder saved to {embedder_dir}") except ImportError as import_err: if "DTensor" in str(import_err): print(f"[!] Warning: Could not save embedder due to PyTorch/transformers compatibility issue: {import_err}") print("This is a known issue with certain versions. Embedder will work but won't be saved locally.") print("Continuing without local save...") else: print(f"[!] Warning: Could not save embedder: {import_err}") except Exception as save_err: print(f"[!] Warning: Could not save embedder locally: {save_err}") print("Continuing without local save...") print("[OK] Embedding model loaded!") def build_faiss_index(self, documents: List[str], metadata: List[Dict] = None): """ Build FAISS index from documents Args: documents: List of text documents to index metadata: Optional metadata for each document """ if not self.embedder: self.load_embedder() print(f"Building FAISS index for {len(documents)} documents...") # Generate embeddings embeddings = self.embedder.encode(documents, show_progress_bar=True) embeddings = np.array(embeddings).astype('float32') # Get dimension dimension = embeddings.shape[1] # Create FAISS index (L2 distance) self.index = faiss.IndexFlatL2(dimension) # Add embeddings to index self.index.add(embeddings) # Store documents and metadata self.documents = documents self.metadata = metadata if metadata else [{}] * len(documents) print(f"[OK] FAISS index built with {self.index.ntotal} vectors") def build_index_from_json(self, json_data: Dict[str, Any]): """Build FAISS index from exported JSON data""" documents = [] metadata = [] # Add room documents for room in json_data.get("all_rooms", []): # Create text representation room_text = self._room_to_text(room) documents.append(room_text) metadata.append({ "type": "room", "room_id": room.get("room_id"), "data": room }) # Add route documents for route in json_data.get("evacuation_routes", {}).get("routes", []): route_text = self._route_to_text(route) documents.append(route_text) metadata.append({ "type": "route", "route_id": route.get("route_id"), "exit": route.get("exit"), "data": route }) # Build index self.build_faiss_index(documents, metadata) def _room_to_text(self, room: Dict[str, Any]) -> str: """Convert room data to searchable text""" sensor = room.get("sensor_data", {}) text_parts = [ f"Room {room.get('room_id')} ({room.get('name')})", f"Type: {room.get('room_type')}", ] if room.get("has_oxygen_cylinder"): text_parts.append("[!]️ OXYGEN CYLINDER PRESENT - EXPLOSION RISK") if sensor.get("fire_detected"): text_parts.append("[FIRE] FIRE DETECTED") text_parts.extend([ f"Temperature: {sensor.get('temperature_c')}°C", f"Smoke level: {sensor.get('smoke_level')}", f"Oxygen: {sensor.get('oxygen_pct')}%", f"Visibility: {sensor.get('visibility_pct')}%", f"Structural integrity: {sensor.get('structural_integrity_pct')}%", f"Danger score: {sensor.get('danger_score')}", f"Passable: {sensor.get('passable')}" ]) if sensor.get("carbon_monoxide_ppm", 0) > 50: text_parts.append(f"[!]️ HIGH CARBON MONOXIDE: {sensor.get('carbon_monoxide_ppm')} ppm") if sensor.get("flashover_risk", 0) > 0.5: text_parts.append(f"[!]️ FLASHOVER RISK: {sensor.get('flashover_risk')*100:.0f}%") if not sensor.get("exit_accessible", True): text_parts.append("[!]️ EXIT BLOCKED") if sensor.get("occupancy_density", 0) > 0.7: text_parts.append(f"[!]️ HIGH CROWD DENSITY: {sensor.get('occupancy_density')*100:.0f}%") return " | ".join(text_parts) def _route_to_text(self, route: Dict[str, Any]) -> str: """Convert route data to searchable text""" metrics = route.get("metrics", {}) text_parts = [ f"{route.get('route_id')} to {route.get('exit')}", f"Path: {' → '.join(route.get('path', []))}", f"Average danger: {metrics.get('avg_danger')}", f"Max danger: {metrics.get('max_danger')} at {metrics.get('max_danger_location')}", f"Passable: {metrics.get('passable')}", f"Has fire: {metrics.get('has_fire')}", f"Has oxygen hazard: {metrics.get('has_oxygen_hazard')}" ] risk_factors = metrics.get("risk_factors", []) if risk_factors: text_parts.append(f"Risks: {', '.join(risk_factors[:3])}") return " | ".join(text_parts) def search(self, query: str, k: int = 5) -> List[Dict[str, Any]]: """ Search FAISS index for relevant documents Args: query: Search query k: Number of results to return Returns: List of relevant documents with metadata """ if not self.index or not self.embedder: raise ValueError("Index not built. Call build_faiss_index() first.") # Encode query query_embedding = self.embedder.encode([query]) query_embedding = np.array(query_embedding).astype('float32') # Search distances, indices = self.index.search(query_embedding, k) # Return results results = [] for i, idx in enumerate(indices[0]): if idx < len(self.documents): results.append({ "document": self.documents[idx], "metadata": self.metadata[idx], "distance": float(distances[0][i]) }) return results def _build_cot_prompt(self, query: str, context: List[str]) -> str: """Build Chain-of-Thought prompt with step-by-step reasoning""" context_text = "\n".join([f"- {ctx}" for ctx in context]) prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to answer the question concisely. CONTEXT: {context_text} QUESTION: {query} Think step by step, then provide a brief answer: REASONING: 1. Analyze available information 2. Identify key safety factors 3. Evaluate risks and prioritize 4. Conclude with recommendation ANSWER:""" return prompt def _build_tot_prompt(self, query: str, context: List[str], thought: str = "") -> str: """Build Tree-of-Thoughts prompt for exploring multiple reasoning paths""" context_text = "\n".join([f"- {ctx}" for ctx in context]) if not thought: prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to explore different reasoning approaches. CONTEXT: {context_text} QUESTION: {query} Let's explore different reasoning approaches to solve this problem: APPROACH 1 - Safety-First Analysis: """ else: prompt = f"""CONTEXT: {context_text} QUESTION: {query} CURRENT THOUGHT: {thought} Evaluate this thought: - Is this reasoning sound? - What are the strengths and weaknesses? - What alternative approaches should we consider? EVALUATION: """ return prompt def _build_reflexion_prompt(self, query: str, context: List[str], previous_answer: str = "", reflection: str = "") -> str: """Build Reflexion prompt for self-reflection and improvement""" context_text = "\n".join([f"- {ctx}" for ctx in context]) if not previous_answer: # Initial answer prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to answer the question. CONTEXT: {context_text} QUESTION: {query} Provide a clear, safety-focused answer based on the context. ANSWER:""" else: # Reflection phase prompt = f"""You are an expert fire evacuation safety advisor. Review and improve your previous answer. CONTEXT: {context_text} QUESTION: {query} PREVIOUS ANSWER: {previous_answer} REFLECTION: {reflection} Now provide an improved answer based on your reflection: IMPROVED ANSWER:""" return prompt def _build_cot_with_tools_prompt(self, query: str, context: List[str], tool_results: List[str] = None) -> str: """Build Chain-of-Thought prompt with tool integration""" context_text = "\n".join([f"- {ctx}" for ctx in context]) tool_text = "" if tool_results: tool_text = "\nTOOL RESULTS:\n" + "\n".join([f"- {result}" for result in tool_results]) prompt = f"""You are an expert fire evacuation safety advisor. Use the following context and tool results to answer the question. CONTEXT: {context_text} {tool_text} QUESTION: {query} Let's solve this step by step, using both the context and tool results: STEP 1 - Understand the question and available data: """ return prompt def _generate_with_decoding_strategy(self, prompt: str, max_length: int = 500, temperature: float = 0.7, top_p: float = 0.9, num_beams: int = 3, stop_sequences: List[str] = None) -> str: """Generate response using specified decoding strategy""" if not self.pipe and not self.model: raise ValueError("Model not loaded. Call download_model() first.") try: if self.use_unsloth and self.model: inputs = self.tokenizer( prompt, return_tensors="pt", truncation=True, max_length=self.max_seq_length ).to(self.model.device) # Configure generation parameters based on decoding strategy gen_kwargs = { "max_new_tokens": max_length, "pad_token_id": self.tokenizer.eos_token_id, "eos_token_id": self.tokenizer.eos_token_id, } if self.decoding_strategy == DecodingStrategy.GREEDY: gen_kwargs.update({ "do_sample": False, "num_beams": 1 }) elif self.decoding_strategy == DecodingStrategy.SAMPLING: gen_kwargs.update({ "do_sample": True, "temperature": temperature, "top_k": 50 }) elif self.decoding_strategy == DecodingStrategy.BEAM_SEARCH: gen_kwargs.update({ "do_sample": False, "num_beams": num_beams, "early_stopping": True }) elif self.decoding_strategy == DecodingStrategy.NUCLEUS: gen_kwargs.update({ "do_sample": True, "temperature": temperature, "top_p": top_p, "top_k": 0 }) elif self.decoding_strategy == DecodingStrategy.TEMPERATURE: gen_kwargs.update({ "do_sample": True, "temperature": temperature }) with torch.no_grad(): outputs = self.model.generate(**inputs, **gen_kwargs) response = self.tokenizer.batch_decode( outputs, skip_special_tokens=True )[0] # Extract response after prompt if prompt in response: response = response.split(prompt)[-1].strip() # Post-process to stop at verbose endings stop_phrases = [ "\n\nHowever, please note", "\n\nAdditionally,", "\n\nLet me know", "\n\nIf you have", "\n\nHere's another", "\n\nQUESTION:", "\n\nLet's break", "\n\nHave a great day", "\n\nI'm here to help" ] for phrase in stop_phrases: if phrase in response: response = response.split(phrase)[0].strip() break return response else: # Use pipeline for standard models gen_kwargs = { "max_length": len(self.tokenizer.encode(prompt)) + max_length, "num_return_sequences": 1, } if self.decoding_strategy == DecodingStrategy.GREEDY: gen_kwargs.update({ "do_sample": False }) elif self.decoding_strategy == DecodingStrategy.SAMPLING: gen_kwargs.update({ "do_sample": True, "temperature": temperature, "top_k": 50 }) elif self.decoding_strategy == DecodingStrategy.BEAM_SEARCH: gen_kwargs.update({ "do_sample": False, "num_beams": num_beams, "early_stopping": True }) elif self.decoding_strategy == DecodingStrategy.NUCLEUS: gen_kwargs.update({ "do_sample": True, "temperature": temperature, "top_p": top_p, "top_k": 0 }) elif self.decoding_strategy == DecodingStrategy.TEMPERATURE: gen_kwargs.update({ "do_sample": True, "temperature": temperature }) gen_kwargs["pad_token_id"] = self.tokenizer.eos_token_id if self.tokenizer else None outputs = self.pipe(prompt, **gen_kwargs) response = outputs[0]['generated_text'] # Extract response after prompt if prompt in response: response = response.split(prompt)[-1].strip() # Post-process to stop at verbose endings stop_phrases = [ "\n\nHowever, please note", "\n\nAdditionally,", "\n\nLet me know", "\n\nIf you have", "\n\nHere's another", "\n\nQUESTION:", "\n\nLet's break", "\n\nHave a great day", "\n\nI'm here to help" ] for phrase in stop_phrases: if phrase in response: response = response.split(phrase)[0].strip() break return response except Exception as e: return f"Error generating response: {e}" def _chain_of_thought_reasoning(self, query: str, context: List[str], max_length: int = 500) -> Tuple[str, str]: """Generate response using Chain-of-Thought reasoning Returns: Tuple of (full_reasoning, final_answer) """ prompt = self._build_cot_prompt(query, context) # Use shorter max_length for CoT to prevent verbosity full_response = self._generate_with_decoding_strategy(prompt, max_length=min(max_length, 300)) # Extract reasoning steps (everything before ANSWER) reasoning = "" if "REASONING:" in full_response: reasoning_parts = full_response.split("REASONING:") if len(reasoning_parts) > 1: reasoning_section = reasoning_parts[1].split("ANSWER:")[0] if "ANSWER:" in reasoning_parts[1] else reasoning_parts[1] reasoning = reasoning_section.strip() elif "ANSWER:" in full_response: reasoning = full_response.split("ANSWER:")[0].strip() else: # Try to extract reasoning from numbered steps lines = full_response.split('\n') reasoning_lines = [] for line in lines: if line.strip().startswith(('1.', '2.', '3.', '4.', '5.', 'Step', 'STEP')): reasoning_lines.append(line.strip()) elif "ANSWER" in line.upper(): break elif reasoning_lines: # Continue collecting if we've started reasoning_lines.append(line.strip()) reasoning = '\n'.join(reasoning_lines) # Extract final answer (everything after ANSWER:) final_answer = full_response if "ANSWER:" in full_response: answer_parts = full_response.split("ANSWER:") if len(answer_parts) > 1: answer_text = answer_parts[-1].strip() # Stop at common continuation markers stop_markers = [ "\n\nHowever, please note", "\n\nAdditionally,", "\n\nLet me know", "\n\nIf you have", "\n\nHere's another", "\n\nQUESTION:", "\n\nLet's break", "\n\nHave a great day", "\n\nI'm here to help", "\n\nThese general guidelines", "\n\nIf you have any further" ] for marker in stop_markers: if marker in answer_text: answer_text = answer_text.split(marker)[0].strip() break # Also limit to first 2-3 sentences if it's still too long sentences = answer_text.split('. ') if len(sentences) > 3: answer_text = '. '.join(sentences[:3]) if not answer_text.endswith('.'): answer_text += '.' final_answer = answer_text # Clean up reasoning - remove verbose parts if reasoning: # Remove common verbose endings verbose_endings = [ "However, please note", "Additionally,", "Let me know", "If you have", "Here's another", "Have a great day", "I'm here to help" ] for ending in verbose_endings: if ending in reasoning: reasoning = reasoning.split(ending)[0].strip() break return reasoning or "Reasoning steps generated", final_answer def _tree_of_thoughts_reasoning(self, query: str, context: List[str], max_length: int = 500, max_thoughts: int = 3) -> Tuple[str, str]: """Generate response using Tree-of-Thoughts reasoning Returns: Tuple of (full_reasoning, final_answer) """ thoughts = [] reasoning_log = [] # Generate initial thoughts for i in range(max_thoughts): thought_prompt = self._build_tot_prompt(query, context, thought=f"Exploring approach {i+1}") thought = self._generate_with_decoding_strategy(thought_prompt, max_length // max_thoughts) thoughts.append(thought) reasoning_log.append(f"APPROACH {i+1}:\n{thought}\n") # Evaluate thoughts and select best evaluation_prompt = f"""Evaluate these different reasoning approaches for answering the question: QUESTION: {query} APPROACHES: """ for i, thought in enumerate(thoughts, 1): evaluation_prompt += f"\nAPPROACH {i}:\n{thought}\n" evaluation_prompt += "\nWhich approach is most sound and complete? Provide the best answer based on the evaluation.\n\nBEST ANSWER:" final_response = self._generate_with_decoding_strategy(evaluation_prompt, max_length) full_reasoning = "\n".join(reasoning_log) + f"\n\nEVALUATION:\n{final_response}" return full_reasoning, final_response def _reflexion_reasoning(self, query: str, context: List[str], max_length: int = 500, max_iterations: int = 2) -> Tuple[str, str]: """Generate response using Reflexion (self-reflection and improvement) Returns: Tuple of (full_reasoning, final_answer) """ reasoning_log = [] # Initial answer initial_prompt = self._build_reflexion_prompt(query, context) answer = self._generate_with_decoding_strategy(initial_prompt, max_length) reasoning_log.append(f"INITIAL ANSWER:\n{answer}\n") # Reflection and improvement iterations for iteration in range(max_iterations): # Generate reflection reflection_prompt = f"""Review this answer for a fire evacuation safety question: QUESTION: {query} CURRENT ANSWER: {answer} What could be improved? Consider: - Accuracy of safety information - Completeness of the response - Clarity and actionability - Missing critical safety factors REFLECTION:""" reflection = self._generate_with_decoding_strategy(reflection_prompt, max_length // 2) reasoning_log.append(f"ITERATION {iteration + 1} - REFLECTION:\n{reflection}\n") # Generate improved answer improved_prompt = self._build_reflexion_prompt(query, context, answer, reflection) improved_answer = self._generate_with_decoding_strategy(improved_prompt, max_length) reasoning_log.append(f"ITERATION {iteration + 1} - IMPROVED ANSWER:\n{improved_answer}\n") # Check if improvement is significant (simple heuristic) if len(improved_answer) > len(answer) * 0.8: # At least 80% of original length answer = improved_answer else: break # Stop if answer becomes too short self.reflexion_history.append({ "query": query, "final_answer": answer, "iterations": iteration + 1 }) full_reasoning = "\n".join(reasoning_log) return full_reasoning, answer def _cot_with_tools_reasoning(self, query: str, context: List[str], max_length: int = 500) -> Tuple[str, str]: """Generate response using Chain-of-Thought with tool integration Returns: Tuple of (full_reasoning, final_answer) """ reasoning_log = [] # Simulate tool calls (in real implementation, these would call actual tools) tool_results = [] # Tool 1: Route analysis if "route" in query.lower() or "path" in query.lower(): tool_result = "Tool: Route Analyzer - Found 3 evacuation routes with risk scores" tool_results.append(tool_result) reasoning_log.append(f"TOOL CALL: {tool_result}\n") # Tool 2: Risk calculator if "danger" in query.lower() or "risk" in query.lower(): tool_result = "Tool: Risk Calculator - Calculated danger scores for all rooms" tool_results.append(tool_result) reasoning_log.append(f"TOOL CALL: {tool_result}\n") # Tool 3: Sensor aggregator if "sensor" in query.lower() or "temperature" in query.lower() or "smoke" in query.lower(): tool_result = "Tool: Sensor Aggregator - Aggregated sensor data from all rooms" tool_results.append(tool_result) reasoning_log.append(f"TOOL CALL: {tool_result}\n") prompt = self._build_cot_with_tools_prompt(query, context, tool_results) response = self._generate_with_decoding_strategy(prompt, max_length) reasoning_log.append(f"REASONING WITH TOOLS:\n{response}\n") full_reasoning = "\n".join(reasoning_log) # Extract final answer final_answer = response if "ANSWER:" in response or "answer:" in response.lower(): parts = response.split("ANSWER:") if "ANSWER:" in response else response.split("answer:") if len(parts) > 1: final_answer = parts[-1].strip() return full_reasoning, final_answer def generate_response(self, query: str, context: List[str] = None, max_length: int = 500, return_reasoning: bool = False) -> str: """ Generate response using Llama model with context and advanced reasoning Args: query: User query context: Optional context strings (if None, will retrieve from FAISS) max_length: Maximum response length return_reasoning: If True, returns tuple of (reasoning, answer), else just answer Returns: If return_reasoning is True: Tuple of (reasoning_steps, final_answer) Otherwise: Just the final answer string """ if not self.pipe and not self.model: raise ValueError("Model not loaded. Call download_model() first.") # Retrieve context if not provided if context is None: search_results = self.search(query, k=3) context = [r["document"] for r in search_results] # Route to appropriate reasoning method based on mode if self.reasoning_mode == ReasoningMode.CHAIN_OF_THOUGHT: reasoning, answer = self._chain_of_thought_reasoning(query, context, max_length) elif self.reasoning_mode == ReasoningMode.TREE_OF_THOUGHTS: reasoning, answer = self._tree_of_thoughts_reasoning(query, context, max_length) elif self.reasoning_mode == ReasoningMode.REFLEXION: reasoning, answer = self._reflexion_reasoning(query, context, max_length) elif self.reasoning_mode == ReasoningMode.COT_WITH_TOOLS: reasoning, answer = self._cot_with_tools_reasoning(query, context, max_length) else: # Standard mode - use enhanced prompt with decoding strategy context_text = "\n".join([f"- {ctx}" for ctx in context]) prompt = f"""You are an expert fire evacuation safety advisor. Use the following context about the building's fire safety status to answer the question. CONTEXT: {context_text} QUESTION: {query} Provide a clear, safety-focused answer based on the context. If the context doesn't contain enough information, say so. ANSWER:""" answer = self._generate_with_decoding_strategy(prompt, max_length) reasoning = f"Standard reasoning mode - Direct answer generation.\n\n{answer}" if return_reasoning: return reasoning, answer return answer def set_reasoning_mode(self, mode: ReasoningMode): """Set the reasoning mode for future queries""" self.reasoning_mode = mode print(f"[OK] Reasoning mode set to: {mode.value}") def set_decoding_strategy(self, strategy: DecodingStrategy): """Set the decoding strategy for future queries""" self.decoding_strategy = strategy print(f"[OK] Decoding strategy set to: {strategy.value}") def query(self, question: str, k: int = 3, reasoning_mode: Optional[ReasoningMode] = None, show_reasoning: bool = True) -> Dict[str, Any]: """ Complete RAG query: retrieve context and generate response with advanced reasoning Args: question: User question k: Number of context documents to retrieve reasoning_mode: Optional override for reasoning mode (uses instance default if None) show_reasoning: If True, includes full reasoning steps in response Returns: Dictionary with answer, context, metadata, reasoning information, and reasoning steps """ # Retrieve relevant context search_results = self.search(question, k=k) # Generate response with reasoning context = [r["document"] for r in search_results] # Temporarily override reasoning mode if provided original_mode = self.reasoning_mode if reasoning_mode is not None: self.reasoning_mode = reasoning_mode try: reasoning, answer = self.generate_response(question, context, return_reasoning=True) finally: # Restore original mode self.reasoning_mode = original_mode result = { "question": question, "answer": answer, "context": context, "reasoning_mode": self.reasoning_mode.value, "decoding_strategy": self.decoding_strategy.value, "sources": [ { "type": r["metadata"].get("type"), "room_id": r["metadata"].get("room_id"), "route_id": r["metadata"].get("route_id"), "relevance_score": 1.0 / (1.0 + r["distance"]) } for r in search_results ] } if show_reasoning: result["reasoning_steps"] = reasoning return result def save_index(self, index_path: str, metadata_path: str): """Save FAISS index and metadata""" if self.index: faiss.write_index(self.index, index_path) with open(metadata_path, 'wb') as f: pickle.dump({ "documents": self.documents, "metadata": self.metadata }, f) print(f"[OK] Saved index to {index_path} and metadata to {metadata_path}") def load_index(self, index_path: str, metadata_path: str): """Load FAISS index and metadata""" self.index = faiss.read_index(index_path) with open(metadata_path, 'rb') as f: data = pickle.load(f) self.documents = data["documents"] self.metadata = data["metadata"] print(f"[OK] Loaded index with {self.index.ntotal} vectors") def compare_reasoning_modes(self, question: str, k: int = 3) -> Dict[str, Any]: """ Compare all reasoning modes for a given question Args: question: User question k: Number of context documents to retrieve Returns: Dictionary with answers from all reasoning modes """ # Retrieve context once search_results = self.search(question, k=k) context = [r["document"] for r in search_results] results = { "question": question, "context": context, "sources": [ { "type": r["metadata"].get("type"), "room_id": r["metadata"].get("room_id"), "route_id": r["metadata"].get("route_id"), "relevance_score": 1.0 / (1.0 + r["distance"]) } for r in search_results ], "answers": {} } # Save original mode original_mode = self.reasoning_mode # Test each reasoning mode for mode in ReasoningMode: try: self.reasoning_mode = mode reasoning, answer = self.generate_response(question, context, return_reasoning=True) results["answers"][mode.value] = { "answer": answer, "reasoning": reasoning, "length": len(answer) } except Exception as e: results["answers"][mode.value] = { "error": str(e) } # Restore original mode self.reasoning_mode = original_mode return results # === Gradio integration === _rag_instance: Optional[FireEvacuationRAG] = None def _init_rag() -> FireEvacuationRAG: """Initialize and cache the RAG system for Gradio use.""" global _rag_instance if _rag_instance is not None: return _rag_instance # Configuration (match original defaults, but without noisy prints) USE_UNSLOTH = True USE_8BIT = False UNSLOTH_MODEL = "unsloth/Meta-Llama-3.1-8B-Instruct" # Set model directory to absolute path MODEL_DIR = r"D:\github\cse499\models" # Create fire evacuation system floor_plan = create_sample_floor_plan() sensor_system = create_sample_fire_scenario(floor_plan) pathfinder = PathFinder(floor_plan, sensor_system) # Export data and build index exporter = FireEvacuationDataExporter(floor_plan, sensor_system, pathfinder) json_data = exporter.export_to_json("fire_evacuation_data.json", start_location="R1") # Initialize RAG if USE_UNSLOTH: rag = FireEvacuationRAG( model_name=UNSLOTH_MODEL, model_dir=MODEL_DIR, use_unsloth=True, load_in_4bit=False, max_seq_length=2048, reasoning_mode=ReasoningMode.CHAIN_OF_THOUGHT, decoding_strategy=DecodingStrategy.NUCLEUS, ) else: rag = FireEvacuationRAG( model_name="nvidia/Llama-3.1-Minitron-4B-Width-Base", model_dir=MODEL_DIR, use_8bit=USE_8BIT, reasoning_mode=ReasoningMode.CHAIN_OF_THOUGHT, decoding_strategy=DecodingStrategy.NUCLEUS, ) rag.download_model() rag.load_embedder() rag.build_index_from_json(json_data) _rag_instance = rag return rag def gradio_answer(question: str) -> str: """Gradio callback: take a text question, return LLM/RAG answer.""" question = (question or "").strip() if not question: return "Please enter a question about fire evacuation or building safety." rag = _init_rag() result = rag.query(question, k=3, show_reasoning=False) return result.get("answer", "No answer generated.") if __name__ == "__main__": iface = gr.Interface( fn=gradio_answer, inputs=gr.Textbox(lines=3, label="Fire Evacuation Question"), outputs=gr.Textbox(lines=6, label="LLM Recommendation"), title="Fire Evacuation RAG Advisor", description="Ask about evacuation routes, dangers, and exits in the simulated building.", ) iface.launch()