|
|
""" |
|
|
Llama + FAISS RAG System for Fire Evacuation with Advanced Reasoning |
|
|
|
|
|
This module implements a RAG (Retrieval-Augmented Generation) system for fire evacuation scenarios |
|
|
with advanced LLM reasoning techniques including: |
|
|
|
|
|
1. Chain-of-Thought (CoT) Prompting: |
|
|
- Enables step-by-step reasoning through intermediate steps |
|
|
- Improves complex problem-solving capabilities |
|
|
- Reference: https://arxiv.org/pdf/2201.11903 |
|
|
|
|
|
2. Tree-of-Thoughts (ToT): |
|
|
- Maintains multiple reasoning paths |
|
|
- Self-evaluates progress through intermediate thoughts |
|
|
- Enables deliberate reasoning process |
|
|
- Reference: https://arxiv.org/pdf/2305.10601 |
|
|
|
|
|
3. Reflexion: |
|
|
- Reinforces language-based agents through linguistic feedback |
|
|
- Self-reflection and iterative improvement |
|
|
- Reference: https://arxiv.org/pdf/2303.11366 |
|
|
|
|
|
4. CoT with Tools: |
|
|
- Combines CoT prompting with external tools |
|
|
- Interleaved reasoning and tool usage |
|
|
- Reference: https://arxiv.org/pdf/2303.09014 |
|
|
|
|
|
5. Advanced Decoding Strategies: |
|
|
- Greedy: Deterministic highest probability |
|
|
- Sampling: Random sampling with temperature |
|
|
- Beam Search: Explores multiple paths |
|
|
- Nucleus (Top-p): Samples from top-p probability mass |
|
|
- Temperature: Temperature-based sampling |
|
|
|
|
|
Downloads Llama model, creates JSON dataset, builds FAISS index, and provides RAG querying |
|
|
""" |
|
|
import unsloth |
|
|
import json |
|
|
import os |
|
|
import pickle |
|
|
import glob |
|
|
import re |
|
|
from typing import List, Dict, Any, Optional, Tuple |
|
|
|
|
|
from pathlib import Path |
|
|
from enum import Enum |
|
|
import copy |
|
|
|
|
|
import numpy as np |
|
|
import faiss |
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, pipeline |
|
|
from sentence_transformers import SentenceTransformer |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
from floor_plan import create_sample_floor_plan, FloorPlan |
|
|
from sensor_system import create_sample_fire_scenario, SensorSystem |
|
|
from pathfinding import PathFinder |
|
|
|
|
|
|
|
|
|
|
|
class FireEvacuationDataExporter: |
|
|
"""Exports fire evacuation system data to JSON format""" |
|
|
|
|
|
def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem, pathfinder: PathFinder): |
|
|
self.floor_plan = floor_plan |
|
|
self.sensor_system = sensor_system |
|
|
self.pathfinder = pathfinder |
|
|
|
|
|
def export_room_data(self, room_id: str) -> Dict[str, Any]: |
|
|
"""Export comprehensive room data to JSON""" |
|
|
room = self.floor_plan.get_room(room_id) |
|
|
sensor = self.sensor_system.get_sensor_reading(room_id) |
|
|
|
|
|
if not room or not sensor: |
|
|
return {} |
|
|
|
|
|
return { |
|
|
"room_id": room_id, |
|
|
"name": room.name, |
|
|
"room_type": room.room_type, |
|
|
"position": room.position, |
|
|
"size": room.size, |
|
|
"has_oxygen_cylinder": room.has_oxygen_cylinder, |
|
|
"has_fire_extinguisher": room.has_fire_extinguisher, |
|
|
"connected_to": [conn[0] for conn in room.connected_to], |
|
|
"sensor_data": { |
|
|
"fire_detected": sensor.fire_detected, |
|
|
"smoke_level": round(sensor.smoke_level, 2), |
|
|
"temperature_c": round(sensor.temperature, 1), |
|
|
"oxygen_pct": round(sensor.oxygen_level, 1), |
|
|
"visibility_pct": round(sensor.visibility, 1), |
|
|
"structural_integrity_pct": round(sensor.structural_integrity, 1), |
|
|
"fire_growth_rate": round(sensor.fire_growth_rate, 2), |
|
|
"flashover_risk": round(sensor.flashover_risk, 2), |
|
|
"backdraft_risk": round(sensor.backdraft_risk, 2), |
|
|
"heat_radiation": round(sensor.heat_radiation, 2), |
|
|
"fire_type": sensor.fire_type, |
|
|
"carbon_monoxide_ppm": round(sensor.carbon_monoxide, 1), |
|
|
"carbon_dioxide_ppm": round(sensor.carbon_dioxide, 1), |
|
|
"hydrogen_cyanide_ppm": round(sensor.hydrogen_cyanide, 2), |
|
|
"hydrogen_chloride_ppm": round(sensor.hydrogen_chloride, 2), |
|
|
"wind_direction": round(sensor.wind_direction, 1), |
|
|
"wind_speed": round(sensor.wind_speed, 2), |
|
|
"air_pressure": round(sensor.air_pressure, 2), |
|
|
"humidity": round(sensor.humidity, 1), |
|
|
"occupancy_density": round(sensor.occupancy_density, 2), |
|
|
"mobility_limitations": sensor.mobility_limitations, |
|
|
"panic_level": round(sensor.panic_level, 2), |
|
|
"evacuation_progress": round(sensor.evacuation_progress, 1), |
|
|
"sprinkler_active": sensor.sprinkler_active, |
|
|
"emergency_lighting": sensor.emergency_lighting, |
|
|
"elevator_available": sensor.elevator_available, |
|
|
"stairwell_clear": sensor.stairwell_clear, |
|
|
"exit_accessible": sensor.exit_accessible, |
|
|
"exit_capacity": sensor.exit_capacity, |
|
|
"ventilation_active": sensor.ventilation_active, |
|
|
"time_since_fire_start": sensor.time_since_fire_start, |
|
|
"estimated_time_to_exit": sensor.estimated_time_to_exit, |
|
|
"emergency_comm_working": sensor.emergency_comm_working, |
|
|
"wifi_signal_strength": round(sensor.wifi_signal_strength, 1), |
|
|
"danger_score": round(sensor.calculate_danger_score(), 1), |
|
|
"passable": sensor.is_passable() |
|
|
} |
|
|
} |
|
|
|
|
|
def export_route_data(self, start_location: str = "R1") -> Dict[str, Any]: |
|
|
"""Export all evacuation routes with detailed information""" |
|
|
routes = self.pathfinder.find_all_evacuation_routes(start_location) |
|
|
|
|
|
route_data = { |
|
|
"timestamp_sec": 0, |
|
|
"start_location": start_location, |
|
|
"total_routes": len(routes), |
|
|
"routes": [] |
|
|
} |
|
|
|
|
|
for idx, (exit_id, path, risk) in enumerate(routes, 1): |
|
|
route_info = { |
|
|
"route_id": f"Route {idx}", |
|
|
"exit": exit_id, |
|
|
"path": path, |
|
|
"metrics": { |
|
|
"avg_danger": round(risk['avg_danger'], 2), |
|
|
"max_danger": round(risk['max_danger'], 2), |
|
|
"max_danger_location": risk['max_danger_location'], |
|
|
"total_danger": round(risk['total_danger'], 2), |
|
|
"path_length": risk['path_length'], |
|
|
"has_fire": risk['has_fire'], |
|
|
"has_oxygen_hazard": risk['has_oxygen_hazard'], |
|
|
"passable": risk['passable'], |
|
|
"risk_factors": risk['risk_factors'] |
|
|
}, |
|
|
"nodes": [] |
|
|
} |
|
|
|
|
|
|
|
|
for room_id in path: |
|
|
node_data = self.export_room_data(room_id) |
|
|
if node_data: |
|
|
route_info["nodes"].append(node_data) |
|
|
|
|
|
route_data["routes"].append(route_info) |
|
|
|
|
|
return route_data |
|
|
|
|
|
def export_all_rooms(self) -> List[Dict[str, Any]]: |
|
|
"""Export all rooms as separate documents""" |
|
|
all_rooms = [] |
|
|
for room_id in self.floor_plan.rooms: |
|
|
room_data = self.export_room_data(room_id) |
|
|
if room_data: |
|
|
all_rooms.append(room_data) |
|
|
return all_rooms |
|
|
|
|
|
def export_to_json(self, output_path: str, start_location: str = "R1"): |
|
|
"""Export complete dataset to JSON file""" |
|
|
data = { |
|
|
"floor_plan": { |
|
|
"floor_name": self.floor_plan.floor_name, |
|
|
"total_rooms": len(self.floor_plan.rooms), |
|
|
"exits": self.floor_plan.exits |
|
|
}, |
|
|
"all_rooms": self.export_all_rooms(), |
|
|
"evacuation_routes": self.export_route_data(start_location) |
|
|
} |
|
|
|
|
|
with open(output_path, 'w', encoding='utf-8') as f: |
|
|
json.dump(data, f, indent=2, ensure_ascii=False) |
|
|
|
|
|
print(f"[OK] Exported data to {output_path}") |
|
|
return data |
|
|
|
|
|
|
|
|
class ReasoningMode(Enum): |
|
|
"""Enumeration of reasoning modes""" |
|
|
STANDARD = "standard" |
|
|
CHAIN_OF_THOUGHT = "chain_of_thought" |
|
|
TREE_OF_THOUGHTS = "tree_of_thoughts" |
|
|
REFLEXION = "reflexion" |
|
|
COT_WITH_TOOLS = "cot_with_tools" |
|
|
|
|
|
|
|
|
class DecodingStrategy(Enum): |
|
|
"""Enumeration of decoding strategies""" |
|
|
GREEDY = "greedy" |
|
|
SAMPLING = "sampling" |
|
|
BEAM_SEARCH = "beam_search" |
|
|
NUCLEUS = "nucleus" |
|
|
TEMPERATURE = "temperature" |
|
|
|
|
|
|
|
|
class FireEvacuationRAG: |
|
|
"""RAG system using FAISS for retrieval and Llama for generation with advanced reasoning""" |
|
|
|
|
|
def __init__(self, model_name: str = "nvidia/Llama-3.1-Minitron-4B-Width-Base", model_dir: str = "./models", |
|
|
use_8bit: bool = False, use_unsloth: bool = False, load_in_4bit: bool = True, max_seq_length: int = 2048, |
|
|
reasoning_mode: ReasoningMode = ReasoningMode.CHAIN_OF_THOUGHT, |
|
|
decoding_strategy: DecodingStrategy = DecodingStrategy.NUCLEUS): |
|
|
self.model_name = model_name |
|
|
self.model_dir = model_dir |
|
|
self.local_model_path = os.path.join(model_dir, model_name.replace("/", "_")) |
|
|
self.use_8bit = use_8bit |
|
|
self.use_unsloth = use_unsloth |
|
|
self.load_in_4bit = load_in_4bit |
|
|
self.max_seq_length = max_seq_length |
|
|
self.reasoning_mode = reasoning_mode |
|
|
self.decoding_strategy = decoding_strategy |
|
|
self.tokenizer = None |
|
|
self.model = None |
|
|
self.pipe = None |
|
|
self.embedder = None |
|
|
self.index = None |
|
|
self.documents = [] |
|
|
self.metadata = [] |
|
|
self.reflexion_history = [] |
|
|
|
|
|
|
|
|
os.makedirs(self.model_dir, exist_ok=True) |
|
|
os.makedirs(self.local_model_path, exist_ok=True) |
|
|
|
|
|
print(f"Initializing RAG system with model: {model_name}") |
|
|
print(f"Model will be saved to: {self.local_model_path}") |
|
|
print(f"Reasoning mode: {reasoning_mode.value}") |
|
|
print(f"Decoding strategy: {decoding_strategy.value}") |
|
|
if use_unsloth: |
|
|
print("[*] Unsloth enabled (faster loading and inference)") |
|
|
if load_in_4bit: |
|
|
print(" - 4-bit quantization enabled (very fast, low memory)") |
|
|
elif use_8bit: |
|
|
print("[!] 8-bit quantization enabled (faster loading, lower memory, slight quality trade-off)") |
|
|
|
|
|
def _check_model_files_exist(self, model_path: str) -> bool: |
|
|
"""Check if model files actually exist (not just config.json)""" |
|
|
required_files = [ |
|
|
"config.json", |
|
|
"model.safetensors.index.json" |
|
|
] |
|
|
|
|
|
|
|
|
model_file_patterns = [ |
|
|
"model.safetensors", |
|
|
"pytorch_model.bin", |
|
|
"model-*.safetensors" |
|
|
] |
|
|
|
|
|
config_exists = os.path.exists(os.path.join(model_path, "config.json")) |
|
|
if not config_exists: |
|
|
return False |
|
|
|
|
|
|
|
|
for pattern in model_file_patterns: |
|
|
if glob.glob(os.path.join(model_path, pattern)): |
|
|
return True |
|
|
|
|
|
|
|
|
if os.path.exists(os.path.join(model_path, "model.safetensors.index.json")): |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
def download_model(self): |
|
|
"""Download and load the Llama model, saving weights to local directory""" |
|
|
print("Downloading Llama model (this may take a while)...") |
|
|
print(f"Model weights will be saved to: {self.local_model_path}") |
|
|
|
|
|
|
|
|
if self.use_unsloth: |
|
|
try: |
|
|
from unsloth import FastLanguageModel |
|
|
from transformers import TextStreamer |
|
|
print("[*] Using Unsloth for fast model loading...") |
|
|
|
|
|
|
|
|
is_pre_quantized = "bnb-4bit" in self.model_name.lower() or "bnb-8bit" in self.model_name.lower() |
|
|
|
|
|
|
|
|
|
|
|
if self.load_in_4bit and not is_pre_quantized: |
|
|
try: |
|
|
import bitsandbytes |
|
|
print("[OK] bitsandbytes available for 4-bit quantization") |
|
|
except ImportError: |
|
|
print("[!] bitsandbytes not found. 4-bit quantization requires bitsandbytes.") |
|
|
print(" Install with: pip install bitsandbytes") |
|
|
print(" Falling back to full precision...") |
|
|
self.load_in_4bit = False |
|
|
|
|
|
|
|
|
if self._check_model_files_exist(self.local_model_path): |
|
|
print(f"Loading from local path: {self.local_model_path}") |
|
|
model_path = self.local_model_path |
|
|
else: |
|
|
print(f"Downloading model: {self.model_name}") |
|
|
model_path = self.model_name |
|
|
|
|
|
|
|
|
dtype = None |
|
|
|
|
|
|
|
|
|
|
|
max_retries = 2 |
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
|
|
|
if is_pre_quantized or attempt > 0: |
|
|
print("[OK] Loading model without quantization parameters...") |
|
|
|
|
|
load_kwargs = { |
|
|
"model_name": model_path, |
|
|
"max_seq_length": self.max_seq_length, |
|
|
"dtype": dtype, |
|
|
} |
|
|
else: |
|
|
|
|
|
load_kwargs = { |
|
|
"model_name": model_path, |
|
|
"max_seq_length": self.max_seq_length, |
|
|
"dtype": dtype, |
|
|
} |
|
|
if self.load_in_4bit: |
|
|
load_kwargs["load_in_4bit"] = True |
|
|
|
|
|
self.model, self.tokenizer = FastLanguageModel.from_pretrained(**load_kwargs) |
|
|
break |
|
|
|
|
|
except (ImportError, Exception) as quant_error: |
|
|
error_str = str(quant_error) |
|
|
is_bitsandbytes_error = ( |
|
|
"bitsandbytes" in error_str.lower() or |
|
|
"PackageNotFoundError" in error_str or |
|
|
"No package metadata" in error_str or |
|
|
"quantization_config" in error_str.lower() |
|
|
) |
|
|
|
|
|
if is_bitsandbytes_error and attempt < max_retries - 1: |
|
|
print(f"[!] Attempt {attempt + 1}: bitsandbytes error detected.") |
|
|
print(f" Error: {error_str[:150]}...") |
|
|
print(" Retrying without quantization parameters...") |
|
|
continue |
|
|
elif is_bitsandbytes_error: |
|
|
print("[!] bitsandbytes required but not installed.") |
|
|
print(" Options:") |
|
|
print(" 1. Install bitsandbytes: pip install bitsandbytes") |
|
|
print(" 2. Use a non-quantized model") |
|
|
print(" 3. Set USE_UNSLOTH=False to use standard loading") |
|
|
raise ImportError( |
|
|
"bitsandbytes is required for this model. " |
|
|
"Install with: pip install bitsandbytes" |
|
|
) from quant_error |
|
|
else: |
|
|
|
|
|
raise |
|
|
|
|
|
|
|
|
FastLanguageModel.for_inference(self.model) |
|
|
|
|
|
print("[OK] Model loaded successfully with Unsloth!") |
|
|
|
|
|
|
|
|
if torch.cuda.is_available(): |
|
|
actual_device = next(self.model.parameters()).device |
|
|
print(f"[OK] Model loaded on {actual_device}!") |
|
|
allocated = torch.cuda.memory_allocated(0) / 1024**3 |
|
|
print(f"[OK] GPU Memory allocated: {allocated:.2f} GB") |
|
|
else: |
|
|
print("[OK] Model loaded on CPU!") |
|
|
|
|
|
|
|
|
self.pipe = self.model |
|
|
|
|
|
return |
|
|
|
|
|
except ImportError: |
|
|
print("[!] Unsloth not installed. Falling back to standard loading.") |
|
|
print(" Install with: pip install unsloth") |
|
|
self.use_unsloth = False |
|
|
except Exception as e: |
|
|
print(f"[!] Unsloth loading failed: {e}") |
|
|
print(" Falling back to standard loading...") |
|
|
self.use_unsloth = False |
|
|
|
|
|
|
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
if device == "cuda": |
|
|
gpu_name = torch.cuda.get_device_name(0) |
|
|
gpu_memory = torch.cuda.get_device_properties(0).total_memory / 1024**3 |
|
|
print(f"[OK] GPU detected: {gpu_name}") |
|
|
print(f"[OK] GPU Memory: {gpu_memory:.2f} GB") |
|
|
|
|
|
torch_dtype = torch.bfloat16 |
|
|
print("[OK] Using bfloat16 precision for faster loading") |
|
|
else: |
|
|
print("[!] No GPU detected, using CPU (will be slower)") |
|
|
torch_dtype = torch.float32 |
|
|
print("[OK] Using float32 precision for CPU") |
|
|
|
|
|
|
|
|
try: |
|
|
import flash_attn |
|
|
attn_impl = 'flash_attention_2' |
|
|
print("[OK] FlashAttention2 available - using for optimal performance") |
|
|
except ImportError: |
|
|
attn_impl = 'sdpa' |
|
|
print("[OK] Using SDPA (Scaled Dot Product Attention) for faster inference") |
|
|
|
|
|
|
|
|
use_quantization = False |
|
|
if self.use_8bit and device == "cuda": |
|
|
try: |
|
|
import bitsandbytes |
|
|
use_quantization = True |
|
|
print("[OK] 8-bit quantization available - will use for faster loading") |
|
|
except ImportError: |
|
|
print("[!] 8-bit requested but bitsandbytes not installed, using full precision") |
|
|
print(" Install with: pip install bitsandbytes") |
|
|
|
|
|
try: |
|
|
|
|
|
if self._check_model_files_exist(self.local_model_path): |
|
|
print(f"Found existing model at {self.local_model_path}, loading from local...") |
|
|
model_path = self.local_model_path |
|
|
load_from_local = True |
|
|
else: |
|
|
print("Downloading model from HuggingFace...") |
|
|
model_path = self.model_name |
|
|
load_from_local = False |
|
|
|
|
|
|
|
|
print("Loading tokenizer...") |
|
|
self.tokenizer = AutoTokenizer.from_pretrained( |
|
|
model_path, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
|
|
|
if not load_from_local: |
|
|
try: |
|
|
print("Saving tokenizer to local directory...") |
|
|
self.tokenizer.save_pretrained(self.local_model_path) |
|
|
print(f"[OK] Tokenizer saved to {self.local_model_path}") |
|
|
except Exception as save_err: |
|
|
print(f"[!] Warning: Could not save tokenizer locally: {save_err}") |
|
|
print("Continuing without local save...") |
|
|
|
|
|
|
|
|
print("Loading model with optimizations...") |
|
|
load_kwargs = { |
|
|
"trust_remote_code": True, |
|
|
"low_cpu_mem_usage": True, |
|
|
"_attn_implementation": attn_impl, |
|
|
} |
|
|
|
|
|
|
|
|
if use_quantization: |
|
|
from transformers import BitsAndBytesConfig |
|
|
load_kwargs["quantization_config"] = BitsAndBytesConfig( |
|
|
load_in_8bit=True, |
|
|
llm_int8_threshold=6.0 |
|
|
) |
|
|
print("[OK] Using 8-bit quantization for faster loading and lower memory") |
|
|
else: |
|
|
load_kwargs["torch_dtype"] = torch_dtype |
|
|
|
|
|
|
|
|
if device == "cuda": |
|
|
try: |
|
|
load_kwargs["device_map"] = "auto" |
|
|
print("[OK] Using device_map='auto' for optimal GPU memory management") |
|
|
except Exception as e: |
|
|
print(f"[!] device_map='auto' failed, using manual GPU placement: {e}") |
|
|
load_kwargs.pop("device_map", None) |
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
model_path, |
|
|
**load_kwargs |
|
|
) |
|
|
|
|
|
|
|
|
if device == "cuda" and "device_map" not in load_kwargs: |
|
|
self.model = self.model.cuda() |
|
|
print("[OK] Model moved to GPU") |
|
|
|
|
|
|
|
|
if not load_from_local: |
|
|
try: |
|
|
print("Saving model weights to local directory (this may take a while)...") |
|
|
self.model.save_pretrained( |
|
|
self.local_model_path, |
|
|
safe_serialization=True |
|
|
) |
|
|
print(f"[OK] Model saved to {self.local_model_path}") |
|
|
except ImportError as import_err: |
|
|
if "DTensor" in str(import_err): |
|
|
print(f"[!] Warning: Could not save model due to PyTorch/transformers compatibility issue: {import_err}") |
|
|
print("This is a known issue with certain versions. Model will work but won't be saved locally.") |
|
|
print("Continuing without local save...") |
|
|
else: |
|
|
raise |
|
|
except Exception as save_err: |
|
|
print(f"[!] Warning: Could not save model locally: {save_err}") |
|
|
print("Continuing without local save...") |
|
|
|
|
|
|
|
|
print("Creating pipeline...") |
|
|
pipeline_kwargs = { |
|
|
"model": self.model, |
|
|
"tokenizer": self.tokenizer, |
|
|
} |
|
|
if device == "cuda": |
|
|
pipeline_kwargs["device_map"] = "auto" |
|
|
|
|
|
self.pipe = pipeline("text-generation", **pipeline_kwargs) |
|
|
|
|
|
|
|
|
if device == "cuda": |
|
|
actual_device = next(self.model.parameters()).device |
|
|
print(f"[OK] Model loaded successfully on {actual_device}!") |
|
|
if torch.cuda.is_available(): |
|
|
allocated = torch.cuda.memory_allocated(0) / 1024**3 |
|
|
print(f"[OK] GPU Memory allocated: {allocated:.2f} GB") |
|
|
else: |
|
|
print("[OK] Model loaded successfully on CPU!") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error loading model: {e}") |
|
|
print("Falling back to pipeline-only loading...") |
|
|
try: |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
torch_dtype = torch.bfloat16 if device == "cuda" else torch.float32 |
|
|
|
|
|
|
|
|
if self._check_model_files_exist(self.local_model_path): |
|
|
print(f"Attempting to load from local path: {self.local_model_path}") |
|
|
pipeline_kwargs = { |
|
|
"model": self.local_model_path, |
|
|
"trust_remote_code": True, |
|
|
"torch_dtype": torch_dtype, |
|
|
} |
|
|
if device == "cuda": |
|
|
pipeline_kwargs["device_map"] = "auto" |
|
|
self.pipe = pipeline("text-generation", **pipeline_kwargs) |
|
|
|
|
|
if hasattr(self.pipe, 'tokenizer'): |
|
|
self.tokenizer = self.pipe.tokenizer |
|
|
else: |
|
|
print(f"Downloading model: {self.model_name}") |
|
|
pipeline_kwargs = { |
|
|
"model": self.model_name, |
|
|
"trust_remote_code": True, |
|
|
"torch_dtype": torch_dtype, |
|
|
} |
|
|
if device == "cuda": |
|
|
pipeline_kwargs["device_map"] = "auto" |
|
|
self.pipe = pipeline("text-generation", **pipeline_kwargs) |
|
|
|
|
|
if hasattr(self.pipe, 'tokenizer'): |
|
|
self.tokenizer = self.pipe.tokenizer |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(self.pipe, 'model') and hasattr(self.pipe.model, 'save_pretrained'): |
|
|
print("Attempting to save downloaded model to local directory...") |
|
|
self.pipe.model.save_pretrained(self.local_model_path, safe_serialization=True) |
|
|
if hasattr(self.pipe, 'tokenizer'): |
|
|
self.pipe.tokenizer.save_pretrained(self.local_model_path) |
|
|
print("[OK] Model saved successfully") |
|
|
except ImportError as import_err: |
|
|
if "DTensor" in str(import_err): |
|
|
print(f"[!] Warning: Could not save model due to compatibility issue. Model will work but won't be saved locally.") |
|
|
else: |
|
|
print(f"[!] Warning: Could not save model: {import_err}") |
|
|
except Exception as save_err: |
|
|
print(f"[!] Warning: Could not save model locally: {save_err}") |
|
|
|
|
|
except Exception as e2: |
|
|
print(f"Pipeline loading also failed: {e2}") |
|
|
raise |
|
|
|
|
|
def load_embedder(self, model_name: str = "all-MiniLM-L6-v2"): |
|
|
"""Load sentence transformer for embeddings, saving to local directory""" |
|
|
embedder_dir = os.path.join(self.model_dir, "embedder", model_name.replace("/", "_")) |
|
|
os.makedirs(embedder_dir, exist_ok=True) |
|
|
|
|
|
print(f"Loading embedding model: {model_name}...") |
|
|
print(f"Embedder will be cached in: {embedder_dir}") |
|
|
|
|
|
|
|
|
config_path = os.path.join(embedder_dir, "config.json") |
|
|
has_model_files = False |
|
|
if os.path.exists(config_path): |
|
|
|
|
|
model_files = glob.glob(os.path.join(embedder_dir, "*.safetensors")) + \ |
|
|
glob.glob(os.path.join(embedder_dir, "pytorch_model.bin")) |
|
|
if model_files or os.path.exists(os.path.join(embedder_dir, "model.safetensors.index.json")): |
|
|
has_model_files = True |
|
|
|
|
|
if has_model_files: |
|
|
print(f"Loading embedder from local cache: {embedder_dir}") |
|
|
self.embedder = SentenceTransformer(embedder_dir) |
|
|
else: |
|
|
print("Downloading embedder from HuggingFace...") |
|
|
self.embedder = SentenceTransformer(model_name, cache_folder=embedder_dir) |
|
|
|
|
|
try: |
|
|
self.embedder.save(embedder_dir) |
|
|
print(f"[OK] Embedder saved to {embedder_dir}") |
|
|
except ImportError as import_err: |
|
|
if "DTensor" in str(import_err): |
|
|
print(f"[!] Warning: Could not save embedder due to PyTorch/transformers compatibility issue: {import_err}") |
|
|
print("This is a known issue with certain versions. Embedder will work but won't be saved locally.") |
|
|
print("Continuing without local save...") |
|
|
else: |
|
|
print(f"[!] Warning: Could not save embedder: {import_err}") |
|
|
except Exception as save_err: |
|
|
print(f"[!] Warning: Could not save embedder locally: {save_err}") |
|
|
print("Continuing without local save...") |
|
|
|
|
|
print("[OK] Embedding model loaded!") |
|
|
|
|
|
def build_faiss_index(self, documents: List[str], metadata: List[Dict] = None): |
|
|
""" |
|
|
Build FAISS index from documents |
|
|
|
|
|
Args: |
|
|
documents: List of text documents to index |
|
|
metadata: Optional metadata for each document |
|
|
""" |
|
|
if not self.embedder: |
|
|
self.load_embedder() |
|
|
|
|
|
print(f"Building FAISS index for {len(documents)} documents...") |
|
|
|
|
|
|
|
|
embeddings = self.embedder.encode(documents, show_progress_bar=True) |
|
|
embeddings = np.array(embeddings).astype('float32') |
|
|
|
|
|
|
|
|
dimension = embeddings.shape[1] |
|
|
|
|
|
|
|
|
self.index = faiss.IndexFlatL2(dimension) |
|
|
|
|
|
|
|
|
self.index.add(embeddings) |
|
|
|
|
|
|
|
|
self.documents = documents |
|
|
self.metadata = metadata if metadata else [{}] * len(documents) |
|
|
|
|
|
print(f"[OK] FAISS index built with {self.index.ntotal} vectors") |
|
|
|
|
|
def build_index_from_json(self, json_data: Dict[str, Any]): |
|
|
"""Build FAISS index from exported JSON data""" |
|
|
documents = [] |
|
|
metadata = [] |
|
|
|
|
|
|
|
|
for room in json_data.get("all_rooms", []): |
|
|
|
|
|
room_text = self._room_to_text(room) |
|
|
documents.append(room_text) |
|
|
metadata.append({ |
|
|
"type": "room", |
|
|
"room_id": room.get("room_id"), |
|
|
"data": room |
|
|
}) |
|
|
|
|
|
|
|
|
for route in json_data.get("evacuation_routes", {}).get("routes", []): |
|
|
route_text = self._route_to_text(route) |
|
|
documents.append(route_text) |
|
|
metadata.append({ |
|
|
"type": "route", |
|
|
"route_id": route.get("route_id"), |
|
|
"exit": route.get("exit"), |
|
|
"data": route |
|
|
}) |
|
|
|
|
|
|
|
|
self.build_faiss_index(documents, metadata) |
|
|
|
|
|
def _room_to_text(self, room: Dict[str, Any]) -> str: |
|
|
"""Convert room data to searchable text""" |
|
|
sensor = room.get("sensor_data", {}) |
|
|
|
|
|
text_parts = [ |
|
|
f"Room {room.get('room_id')} ({room.get('name')})", |
|
|
f"Type: {room.get('room_type')}", |
|
|
] |
|
|
|
|
|
if room.get("has_oxygen_cylinder"): |
|
|
text_parts.append("[!]️ OXYGEN CYLINDER PRESENT - EXPLOSION RISK") |
|
|
|
|
|
if sensor.get("fire_detected"): |
|
|
text_parts.append("[FIRE] FIRE DETECTED") |
|
|
|
|
|
text_parts.extend([ |
|
|
f"Temperature: {sensor.get('temperature_c')}°C", |
|
|
f"Smoke level: {sensor.get('smoke_level')}", |
|
|
f"Oxygen: {sensor.get('oxygen_pct')}%", |
|
|
f"Visibility: {sensor.get('visibility_pct')}%", |
|
|
f"Structural integrity: {sensor.get('structural_integrity_pct')}%", |
|
|
f"Danger score: {sensor.get('danger_score')}", |
|
|
f"Passable: {sensor.get('passable')}" |
|
|
]) |
|
|
|
|
|
if sensor.get("carbon_monoxide_ppm", 0) > 50: |
|
|
text_parts.append(f"[!]️ HIGH CARBON MONOXIDE: {sensor.get('carbon_monoxide_ppm')} ppm") |
|
|
|
|
|
if sensor.get("flashover_risk", 0) > 0.5: |
|
|
text_parts.append(f"[!]️ FLASHOVER RISK: {sensor.get('flashover_risk')*100:.0f}%") |
|
|
|
|
|
if not sensor.get("exit_accessible", True): |
|
|
text_parts.append("[!]️ EXIT BLOCKED") |
|
|
|
|
|
if sensor.get("occupancy_density", 0) > 0.7: |
|
|
text_parts.append(f"[!]️ HIGH CROWD DENSITY: {sensor.get('occupancy_density')*100:.0f}%") |
|
|
|
|
|
return " | ".join(text_parts) |
|
|
|
|
|
def _route_to_text(self, route: Dict[str, Any]) -> str: |
|
|
"""Convert route data to searchable text""" |
|
|
metrics = route.get("metrics", {}) |
|
|
|
|
|
text_parts = [ |
|
|
f"{route.get('route_id')} to {route.get('exit')}", |
|
|
f"Path: {' → '.join(route.get('path', []))}", |
|
|
f"Average danger: {metrics.get('avg_danger')}", |
|
|
f"Max danger: {metrics.get('max_danger')} at {metrics.get('max_danger_location')}", |
|
|
f"Passable: {metrics.get('passable')}", |
|
|
f"Has fire: {metrics.get('has_fire')}", |
|
|
f"Has oxygen hazard: {metrics.get('has_oxygen_hazard')}" |
|
|
] |
|
|
|
|
|
risk_factors = metrics.get("risk_factors", []) |
|
|
if risk_factors: |
|
|
text_parts.append(f"Risks: {', '.join(risk_factors[:3])}") |
|
|
|
|
|
return " | ".join(text_parts) |
|
|
|
|
|
def search(self, query: str, k: int = 5) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Search FAISS index for relevant documents |
|
|
|
|
|
Args: |
|
|
query: Search query |
|
|
k: Number of results to return |
|
|
|
|
|
Returns: |
|
|
List of relevant documents with metadata |
|
|
""" |
|
|
if not self.index or not self.embedder: |
|
|
raise ValueError("Index not built. Call build_faiss_index() first.") |
|
|
|
|
|
|
|
|
query_embedding = self.embedder.encode([query]) |
|
|
query_embedding = np.array(query_embedding).astype('float32') |
|
|
|
|
|
|
|
|
distances, indices = self.index.search(query_embedding, k) |
|
|
|
|
|
|
|
|
results = [] |
|
|
for i, idx in enumerate(indices[0]): |
|
|
if idx < len(self.documents): |
|
|
results.append({ |
|
|
"document": self.documents[idx], |
|
|
"metadata": self.metadata[idx], |
|
|
"distance": float(distances[0][i]) |
|
|
}) |
|
|
|
|
|
return results |
|
|
|
|
|
def _build_cot_prompt(self, query: str, context: List[str]) -> str: |
|
|
"""Build Chain-of-Thought prompt with step-by-step reasoning""" |
|
|
context_text = "\n".join([f"- {ctx}" for ctx in context]) |
|
|
|
|
|
prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to answer the question concisely. |
|
|
|
|
|
CONTEXT: |
|
|
{context_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
Think step by step, then provide a brief answer: |
|
|
|
|
|
REASONING: |
|
|
1. Analyze available information |
|
|
2. Identify key safety factors |
|
|
3. Evaluate risks and prioritize |
|
|
4. Conclude with recommendation |
|
|
|
|
|
ANSWER:""" |
|
|
return prompt |
|
|
|
|
|
def _build_tot_prompt(self, query: str, context: List[str], thought: str = "") -> str: |
|
|
"""Build Tree-of-Thoughts prompt for exploring multiple reasoning paths""" |
|
|
context_text = "\n".join([f"- {ctx}" for ctx in context]) |
|
|
|
|
|
if not thought: |
|
|
prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to explore different reasoning approaches. |
|
|
|
|
|
CONTEXT: |
|
|
{context_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
Let's explore different reasoning approaches to solve this problem: |
|
|
|
|
|
APPROACH 1 - Safety-First Analysis: |
|
|
""" |
|
|
else: |
|
|
prompt = f"""CONTEXT: |
|
|
{context_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
CURRENT THOUGHT: {thought} |
|
|
|
|
|
Evaluate this thought: |
|
|
- Is this reasoning sound? |
|
|
- What are the strengths and weaknesses? |
|
|
- What alternative approaches should we consider? |
|
|
|
|
|
EVALUATION: |
|
|
""" |
|
|
return prompt |
|
|
|
|
|
def _build_reflexion_prompt(self, query: str, context: List[str], previous_answer: str = "", |
|
|
reflection: str = "") -> str: |
|
|
"""Build Reflexion prompt for self-reflection and improvement""" |
|
|
context_text = "\n".join([f"- {ctx}" for ctx in context]) |
|
|
|
|
|
if not previous_answer: |
|
|
|
|
|
prompt = f"""You are an expert fire evacuation safety advisor. Use the following context to answer the question. |
|
|
|
|
|
CONTEXT: |
|
|
{context_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
Provide a clear, safety-focused answer based on the context. |
|
|
|
|
|
ANSWER:""" |
|
|
else: |
|
|
|
|
|
prompt = f"""You are an expert fire evacuation safety advisor. Review and improve your previous answer. |
|
|
|
|
|
CONTEXT: |
|
|
{context_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
PREVIOUS ANSWER: |
|
|
{previous_answer} |
|
|
|
|
|
REFLECTION: |
|
|
{reflection} |
|
|
|
|
|
Now provide an improved answer based on your reflection: |
|
|
|
|
|
IMPROVED ANSWER:""" |
|
|
return prompt |
|
|
|
|
|
def _build_cot_with_tools_prompt(self, query: str, context: List[str], tool_results: List[str] = None) -> str: |
|
|
"""Build Chain-of-Thought prompt with tool integration""" |
|
|
context_text = "\n".join([f"- {ctx}" for ctx in context]) |
|
|
|
|
|
tool_text = "" |
|
|
if tool_results: |
|
|
tool_text = "\nTOOL RESULTS:\n" + "\n".join([f"- {result}" for result in tool_results]) |
|
|
|
|
|
prompt = f"""You are an expert fire evacuation safety advisor. Use the following context and tool results to answer the question. |
|
|
|
|
|
CONTEXT: |
|
|
{context_text} |
|
|
{tool_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
Let's solve this step by step, using both the context and tool results: |
|
|
|
|
|
STEP 1 - Understand the question and available data: |
|
|
""" |
|
|
return prompt |
|
|
|
|
|
def _generate_with_decoding_strategy(self, prompt: str, max_length: int = 500, |
|
|
temperature: float = 0.7, top_p: float = 0.9, |
|
|
num_beams: int = 3, stop_sequences: List[str] = None) -> str: |
|
|
"""Generate response using specified decoding strategy""" |
|
|
if not self.pipe and not self.model: |
|
|
raise ValueError("Model not loaded. Call download_model() first.") |
|
|
|
|
|
try: |
|
|
if self.use_unsloth and self.model: |
|
|
inputs = self.tokenizer( |
|
|
prompt, |
|
|
return_tensors="pt", |
|
|
truncation=True, |
|
|
max_length=self.max_seq_length |
|
|
).to(self.model.device) |
|
|
|
|
|
|
|
|
gen_kwargs = { |
|
|
"max_new_tokens": max_length, |
|
|
"pad_token_id": self.tokenizer.eos_token_id, |
|
|
"eos_token_id": self.tokenizer.eos_token_id, |
|
|
} |
|
|
|
|
|
if self.decoding_strategy == DecodingStrategy.GREEDY: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": False, |
|
|
"num_beams": 1 |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.SAMPLING: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": True, |
|
|
"temperature": temperature, |
|
|
"top_k": 50 |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.BEAM_SEARCH: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": False, |
|
|
"num_beams": num_beams, |
|
|
"early_stopping": True |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.NUCLEUS: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": True, |
|
|
"temperature": temperature, |
|
|
"top_p": top_p, |
|
|
"top_k": 0 |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.TEMPERATURE: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": True, |
|
|
"temperature": temperature |
|
|
}) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = self.model.generate(**inputs, **gen_kwargs) |
|
|
|
|
|
response = self.tokenizer.batch_decode( |
|
|
outputs, |
|
|
skip_special_tokens=True |
|
|
)[0] |
|
|
|
|
|
|
|
|
if prompt in response: |
|
|
response = response.split(prompt)[-1].strip() |
|
|
|
|
|
|
|
|
stop_phrases = [ |
|
|
"\n\nHowever, please note", |
|
|
"\n\nAdditionally,", |
|
|
"\n\nLet me know", |
|
|
"\n\nIf you have", |
|
|
"\n\nHere's another", |
|
|
"\n\nQUESTION:", |
|
|
"\n\nLet's break", |
|
|
"\n\nHave a great day", |
|
|
"\n\nI'm here to help" |
|
|
] |
|
|
for phrase in stop_phrases: |
|
|
if phrase in response: |
|
|
response = response.split(phrase)[0].strip() |
|
|
break |
|
|
|
|
|
return response |
|
|
else: |
|
|
|
|
|
gen_kwargs = { |
|
|
"max_length": len(self.tokenizer.encode(prompt)) + max_length, |
|
|
"num_return_sequences": 1, |
|
|
} |
|
|
|
|
|
if self.decoding_strategy == DecodingStrategy.GREEDY: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": False |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.SAMPLING: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": True, |
|
|
"temperature": temperature, |
|
|
"top_k": 50 |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.BEAM_SEARCH: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": False, |
|
|
"num_beams": num_beams, |
|
|
"early_stopping": True |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.NUCLEUS: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": True, |
|
|
"temperature": temperature, |
|
|
"top_p": top_p, |
|
|
"top_k": 0 |
|
|
}) |
|
|
elif self.decoding_strategy == DecodingStrategy.TEMPERATURE: |
|
|
gen_kwargs.update({ |
|
|
"do_sample": True, |
|
|
"temperature": temperature |
|
|
}) |
|
|
|
|
|
gen_kwargs["pad_token_id"] = self.tokenizer.eos_token_id if self.tokenizer else None |
|
|
|
|
|
outputs = self.pipe(prompt, **gen_kwargs) |
|
|
response = outputs[0]['generated_text'] |
|
|
|
|
|
|
|
|
if prompt in response: |
|
|
response = response.split(prompt)[-1].strip() |
|
|
|
|
|
|
|
|
stop_phrases = [ |
|
|
"\n\nHowever, please note", |
|
|
"\n\nAdditionally,", |
|
|
"\n\nLet me know", |
|
|
"\n\nIf you have", |
|
|
"\n\nHere's another", |
|
|
"\n\nQUESTION:", |
|
|
"\n\nLet's break", |
|
|
"\n\nHave a great day", |
|
|
"\n\nI'm here to help" |
|
|
] |
|
|
for phrase in stop_phrases: |
|
|
if phrase in response: |
|
|
response = response.split(phrase)[0].strip() |
|
|
break |
|
|
|
|
|
return response |
|
|
|
|
|
except Exception as e: |
|
|
return f"Error generating response: {e}" |
|
|
|
|
|
def _chain_of_thought_reasoning(self, query: str, context: List[str], max_length: int = 500) -> Tuple[str, str]: |
|
|
"""Generate response using Chain-of-Thought reasoning |
|
|
|
|
|
Returns: |
|
|
Tuple of (full_reasoning, final_answer) |
|
|
""" |
|
|
prompt = self._build_cot_prompt(query, context) |
|
|
|
|
|
full_response = self._generate_with_decoding_strategy(prompt, max_length=min(max_length, 300)) |
|
|
|
|
|
|
|
|
reasoning = "" |
|
|
if "REASONING:" in full_response: |
|
|
reasoning_parts = full_response.split("REASONING:") |
|
|
if len(reasoning_parts) > 1: |
|
|
reasoning_section = reasoning_parts[1].split("ANSWER:")[0] if "ANSWER:" in reasoning_parts[1] else reasoning_parts[1] |
|
|
reasoning = reasoning_section.strip() |
|
|
elif "ANSWER:" in full_response: |
|
|
reasoning = full_response.split("ANSWER:")[0].strip() |
|
|
else: |
|
|
|
|
|
lines = full_response.split('\n') |
|
|
reasoning_lines = [] |
|
|
for line in lines: |
|
|
if line.strip().startswith(('1.', '2.', '3.', '4.', '5.', 'Step', 'STEP')): |
|
|
reasoning_lines.append(line.strip()) |
|
|
elif "ANSWER" in line.upper(): |
|
|
break |
|
|
elif reasoning_lines: |
|
|
reasoning_lines.append(line.strip()) |
|
|
reasoning = '\n'.join(reasoning_lines) |
|
|
|
|
|
|
|
|
final_answer = full_response |
|
|
if "ANSWER:" in full_response: |
|
|
answer_parts = full_response.split("ANSWER:") |
|
|
if len(answer_parts) > 1: |
|
|
answer_text = answer_parts[-1].strip() |
|
|
|
|
|
stop_markers = [ |
|
|
"\n\nHowever, please note", |
|
|
"\n\nAdditionally,", |
|
|
"\n\nLet me know", |
|
|
"\n\nIf you have", |
|
|
"\n\nHere's another", |
|
|
"\n\nQUESTION:", |
|
|
"\n\nLet's break", |
|
|
"\n\nHave a great day", |
|
|
"\n\nI'm here to help", |
|
|
"\n\nThese general guidelines", |
|
|
"\n\nIf you have any further" |
|
|
] |
|
|
for marker in stop_markers: |
|
|
if marker in answer_text: |
|
|
answer_text = answer_text.split(marker)[0].strip() |
|
|
break |
|
|
|
|
|
sentences = answer_text.split('. ') |
|
|
if len(sentences) > 3: |
|
|
answer_text = '. '.join(sentences[:3]) |
|
|
if not answer_text.endswith('.'): |
|
|
answer_text += '.' |
|
|
final_answer = answer_text |
|
|
|
|
|
|
|
|
if reasoning: |
|
|
|
|
|
verbose_endings = [ |
|
|
"However, please note", |
|
|
"Additionally,", |
|
|
"Let me know", |
|
|
"If you have", |
|
|
"Here's another", |
|
|
"Have a great day", |
|
|
"I'm here to help" |
|
|
] |
|
|
for ending in verbose_endings: |
|
|
if ending in reasoning: |
|
|
reasoning = reasoning.split(ending)[0].strip() |
|
|
break |
|
|
|
|
|
return reasoning or "Reasoning steps generated", final_answer |
|
|
|
|
|
def _tree_of_thoughts_reasoning(self, query: str, context: List[str], max_length: int = 500, |
|
|
max_thoughts: int = 3) -> Tuple[str, str]: |
|
|
"""Generate response using Tree-of-Thoughts reasoning |
|
|
|
|
|
Returns: |
|
|
Tuple of (full_reasoning, final_answer) |
|
|
""" |
|
|
thoughts = [] |
|
|
reasoning_log = [] |
|
|
|
|
|
|
|
|
for i in range(max_thoughts): |
|
|
thought_prompt = self._build_tot_prompt(query, context, |
|
|
thought=f"Exploring approach {i+1}") |
|
|
thought = self._generate_with_decoding_strategy(thought_prompt, max_length // max_thoughts) |
|
|
thoughts.append(thought) |
|
|
reasoning_log.append(f"APPROACH {i+1}:\n{thought}\n") |
|
|
|
|
|
|
|
|
evaluation_prompt = f"""Evaluate these different reasoning approaches for answering the question: |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
APPROACHES: |
|
|
""" |
|
|
for i, thought in enumerate(thoughts, 1): |
|
|
evaluation_prompt += f"\nAPPROACH {i}:\n{thought}\n" |
|
|
|
|
|
evaluation_prompt += "\nWhich approach is most sound and complete? Provide the best answer based on the evaluation.\n\nBEST ANSWER:" |
|
|
|
|
|
final_response = self._generate_with_decoding_strategy(evaluation_prompt, max_length) |
|
|
|
|
|
full_reasoning = "\n".join(reasoning_log) + f"\n\nEVALUATION:\n{final_response}" |
|
|
return full_reasoning, final_response |
|
|
|
|
|
def _reflexion_reasoning(self, query: str, context: List[str], max_length: int = 500, |
|
|
max_iterations: int = 2) -> Tuple[str, str]: |
|
|
"""Generate response using Reflexion (self-reflection and improvement) |
|
|
|
|
|
Returns: |
|
|
Tuple of (full_reasoning, final_answer) |
|
|
""" |
|
|
reasoning_log = [] |
|
|
|
|
|
|
|
|
initial_prompt = self._build_reflexion_prompt(query, context) |
|
|
answer = self._generate_with_decoding_strategy(initial_prompt, max_length) |
|
|
reasoning_log.append(f"INITIAL ANSWER:\n{answer}\n") |
|
|
|
|
|
|
|
|
for iteration in range(max_iterations): |
|
|
|
|
|
reflection_prompt = f"""Review this answer for a fire evacuation safety question: |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
CURRENT ANSWER: |
|
|
{answer} |
|
|
|
|
|
What could be improved? Consider: |
|
|
- Accuracy of safety information |
|
|
- Completeness of the response |
|
|
- Clarity and actionability |
|
|
- Missing critical safety factors |
|
|
|
|
|
REFLECTION:""" |
|
|
|
|
|
reflection = self._generate_with_decoding_strategy(reflection_prompt, max_length // 2) |
|
|
reasoning_log.append(f"ITERATION {iteration + 1} - REFLECTION:\n{reflection}\n") |
|
|
|
|
|
|
|
|
improved_prompt = self._build_reflexion_prompt(query, context, answer, reflection) |
|
|
improved_answer = self._generate_with_decoding_strategy(improved_prompt, max_length) |
|
|
reasoning_log.append(f"ITERATION {iteration + 1} - IMPROVED ANSWER:\n{improved_answer}\n") |
|
|
|
|
|
|
|
|
if len(improved_answer) > len(answer) * 0.8: |
|
|
answer = improved_answer |
|
|
else: |
|
|
break |
|
|
|
|
|
self.reflexion_history.append({ |
|
|
"query": query, |
|
|
"final_answer": answer, |
|
|
"iterations": iteration + 1 |
|
|
}) |
|
|
|
|
|
full_reasoning = "\n".join(reasoning_log) |
|
|
return full_reasoning, answer |
|
|
|
|
|
def _cot_with_tools_reasoning(self, query: str, context: List[str], max_length: int = 500) -> Tuple[str, str]: |
|
|
"""Generate response using Chain-of-Thought with tool integration |
|
|
|
|
|
Returns: |
|
|
Tuple of (full_reasoning, final_answer) |
|
|
""" |
|
|
reasoning_log = [] |
|
|
|
|
|
|
|
|
tool_results = [] |
|
|
|
|
|
|
|
|
if "route" in query.lower() or "path" in query.lower(): |
|
|
tool_result = "Tool: Route Analyzer - Found 3 evacuation routes with risk scores" |
|
|
tool_results.append(tool_result) |
|
|
reasoning_log.append(f"TOOL CALL: {tool_result}\n") |
|
|
|
|
|
|
|
|
if "danger" in query.lower() or "risk" in query.lower(): |
|
|
tool_result = "Tool: Risk Calculator - Calculated danger scores for all rooms" |
|
|
tool_results.append(tool_result) |
|
|
reasoning_log.append(f"TOOL CALL: {tool_result}\n") |
|
|
|
|
|
|
|
|
if "sensor" in query.lower() or "temperature" in query.lower() or "smoke" in query.lower(): |
|
|
tool_result = "Tool: Sensor Aggregator - Aggregated sensor data from all rooms" |
|
|
tool_results.append(tool_result) |
|
|
reasoning_log.append(f"TOOL CALL: {tool_result}\n") |
|
|
|
|
|
prompt = self._build_cot_with_tools_prompt(query, context, tool_results) |
|
|
response = self._generate_with_decoding_strategy(prompt, max_length) |
|
|
|
|
|
reasoning_log.append(f"REASONING WITH TOOLS:\n{response}\n") |
|
|
full_reasoning = "\n".join(reasoning_log) |
|
|
|
|
|
|
|
|
final_answer = response |
|
|
if "ANSWER:" in response or "answer:" in response.lower(): |
|
|
parts = response.split("ANSWER:") if "ANSWER:" in response else response.split("answer:") |
|
|
if len(parts) > 1: |
|
|
final_answer = parts[-1].strip() |
|
|
|
|
|
return full_reasoning, final_answer |
|
|
|
|
|
def generate_response(self, query: str, context: List[str] = None, max_length: int = 500, |
|
|
return_reasoning: bool = False) -> str: |
|
|
""" |
|
|
Generate response using Llama model with context and advanced reasoning |
|
|
|
|
|
Args: |
|
|
query: User query |
|
|
context: Optional context strings (if None, will retrieve from FAISS) |
|
|
max_length: Maximum response length |
|
|
return_reasoning: If True, returns tuple of (reasoning, answer), else just answer |
|
|
|
|
|
Returns: |
|
|
If return_reasoning is True: Tuple of (reasoning_steps, final_answer) |
|
|
Otherwise: Just the final answer string |
|
|
""" |
|
|
if not self.pipe and not self.model: |
|
|
raise ValueError("Model not loaded. Call download_model() first.") |
|
|
|
|
|
|
|
|
if context is None: |
|
|
search_results = self.search(query, k=3) |
|
|
context = [r["document"] for r in search_results] |
|
|
|
|
|
|
|
|
if self.reasoning_mode == ReasoningMode.CHAIN_OF_THOUGHT: |
|
|
reasoning, answer = self._chain_of_thought_reasoning(query, context, max_length) |
|
|
elif self.reasoning_mode == ReasoningMode.TREE_OF_THOUGHTS: |
|
|
reasoning, answer = self._tree_of_thoughts_reasoning(query, context, max_length) |
|
|
elif self.reasoning_mode == ReasoningMode.REFLEXION: |
|
|
reasoning, answer = self._reflexion_reasoning(query, context, max_length) |
|
|
elif self.reasoning_mode == ReasoningMode.COT_WITH_TOOLS: |
|
|
reasoning, answer = self._cot_with_tools_reasoning(query, context, max_length) |
|
|
else: |
|
|
|
|
|
context_text = "\n".join([f"- {ctx}" for ctx in context]) |
|
|
|
|
|
prompt = f"""You are an expert fire evacuation safety advisor. Use the following context about the building's fire safety status to answer the question. |
|
|
|
|
|
CONTEXT: |
|
|
{context_text} |
|
|
|
|
|
QUESTION: {query} |
|
|
|
|
|
Provide a clear, safety-focused answer based on the context. If the context doesn't contain enough information, say so. |
|
|
|
|
|
ANSWER:""" |
|
|
|
|
|
answer = self._generate_with_decoding_strategy(prompt, max_length) |
|
|
reasoning = f"Standard reasoning mode - Direct answer generation.\n\n{answer}" |
|
|
|
|
|
if return_reasoning: |
|
|
return reasoning, answer |
|
|
return answer |
|
|
|
|
|
def set_reasoning_mode(self, mode: ReasoningMode): |
|
|
"""Set the reasoning mode for future queries""" |
|
|
self.reasoning_mode = mode |
|
|
print(f"[OK] Reasoning mode set to: {mode.value}") |
|
|
|
|
|
def set_decoding_strategy(self, strategy: DecodingStrategy): |
|
|
"""Set the decoding strategy for future queries""" |
|
|
self.decoding_strategy = strategy |
|
|
print(f"[OK] Decoding strategy set to: {strategy.value}") |
|
|
|
|
|
def query(self, question: str, k: int = 3, reasoning_mode: Optional[ReasoningMode] = None, |
|
|
show_reasoning: bool = True) -> Dict[str, Any]: |
|
|
""" |
|
|
Complete RAG query: retrieve context and generate response with advanced reasoning |
|
|
|
|
|
Args: |
|
|
question: User question |
|
|
k: Number of context documents to retrieve |
|
|
reasoning_mode: Optional override for reasoning mode (uses instance default if None) |
|
|
show_reasoning: If True, includes full reasoning steps in response |
|
|
|
|
|
Returns: |
|
|
Dictionary with answer, context, metadata, reasoning information, and reasoning steps |
|
|
""" |
|
|
|
|
|
search_results = self.search(question, k=k) |
|
|
|
|
|
|
|
|
context = [r["document"] for r in search_results] |
|
|
|
|
|
|
|
|
original_mode = self.reasoning_mode |
|
|
if reasoning_mode is not None: |
|
|
self.reasoning_mode = reasoning_mode |
|
|
|
|
|
try: |
|
|
reasoning, answer = self.generate_response(question, context, return_reasoning=True) |
|
|
finally: |
|
|
|
|
|
self.reasoning_mode = original_mode |
|
|
|
|
|
result = { |
|
|
"question": question, |
|
|
"answer": answer, |
|
|
"context": context, |
|
|
"reasoning_mode": self.reasoning_mode.value, |
|
|
"decoding_strategy": self.decoding_strategy.value, |
|
|
"sources": [ |
|
|
{ |
|
|
"type": r["metadata"].get("type"), |
|
|
"room_id": r["metadata"].get("room_id"), |
|
|
"route_id": r["metadata"].get("route_id"), |
|
|
"relevance_score": 1.0 / (1.0 + r["distance"]) |
|
|
} |
|
|
for r in search_results |
|
|
] |
|
|
} |
|
|
|
|
|
if show_reasoning: |
|
|
result["reasoning_steps"] = reasoning |
|
|
|
|
|
return result |
|
|
|
|
|
def save_index(self, index_path: str, metadata_path: str): |
|
|
"""Save FAISS index and metadata""" |
|
|
if self.index: |
|
|
faiss.write_index(self.index, index_path) |
|
|
with open(metadata_path, 'wb') as f: |
|
|
pickle.dump({ |
|
|
"documents": self.documents, |
|
|
"metadata": self.metadata |
|
|
}, f) |
|
|
print(f"[OK] Saved index to {index_path} and metadata to {metadata_path}") |
|
|
|
|
|
def load_index(self, index_path: str, metadata_path: str): |
|
|
"""Load FAISS index and metadata""" |
|
|
self.index = faiss.read_index(index_path) |
|
|
with open(metadata_path, 'rb') as f: |
|
|
data = pickle.load(f) |
|
|
self.documents = data["documents"] |
|
|
self.metadata = data["metadata"] |
|
|
print(f"[OK] Loaded index with {self.index.ntotal} vectors") |
|
|
|
|
|
def compare_reasoning_modes(self, question: str, k: int = 3) -> Dict[str, Any]: |
|
|
""" |
|
|
Compare all reasoning modes for a given question |
|
|
|
|
|
Args: |
|
|
question: User question |
|
|
k: Number of context documents to retrieve |
|
|
|
|
|
Returns: |
|
|
Dictionary with answers from all reasoning modes |
|
|
""" |
|
|
|
|
|
search_results = self.search(question, k=k) |
|
|
context = [r["document"] for r in search_results] |
|
|
|
|
|
results = { |
|
|
"question": question, |
|
|
"context": context, |
|
|
"sources": [ |
|
|
{ |
|
|
"type": r["metadata"].get("type"), |
|
|
"room_id": r["metadata"].get("room_id"), |
|
|
"route_id": r["metadata"].get("route_id"), |
|
|
"relevance_score": 1.0 / (1.0 + r["distance"]) |
|
|
} |
|
|
for r in search_results |
|
|
], |
|
|
"answers": {} |
|
|
} |
|
|
|
|
|
|
|
|
original_mode = self.reasoning_mode |
|
|
|
|
|
|
|
|
for mode in ReasoningMode: |
|
|
try: |
|
|
self.reasoning_mode = mode |
|
|
reasoning, answer = self.generate_response(question, context, return_reasoning=True) |
|
|
results["answers"][mode.value] = { |
|
|
"answer": answer, |
|
|
"reasoning": reasoning, |
|
|
"length": len(answer) |
|
|
} |
|
|
except Exception as e: |
|
|
results["answers"][mode.value] = { |
|
|
"error": str(e) |
|
|
} |
|
|
|
|
|
|
|
|
self.reasoning_mode = original_mode |
|
|
|
|
|
return results |
|
|
|
|
|
|
|
|
_rag_instance: Optional[FireEvacuationRAG] = None |
|
|
|
|
|
|
|
|
def _init_rag() -> FireEvacuationRAG: |
|
|
"""Initialize and cache the RAG system for Gradio use.""" |
|
|
global _rag_instance |
|
|
if _rag_instance is not None: |
|
|
return _rag_instance |
|
|
|
|
|
|
|
|
USE_UNSLOTH = True |
|
|
USE_8BIT = False |
|
|
UNSLOTH_MODEL = "unsloth/Meta-Llama-3.1-8B-Instruct" |
|
|
|
|
|
|
|
|
MODEL_DIR = r"D:\github\cse499\models" |
|
|
|
|
|
|
|
|
floor_plan = create_sample_floor_plan() |
|
|
sensor_system = create_sample_fire_scenario(floor_plan) |
|
|
pathfinder = PathFinder(floor_plan, sensor_system) |
|
|
|
|
|
|
|
|
exporter = FireEvacuationDataExporter(floor_plan, sensor_system, pathfinder) |
|
|
json_data = exporter.export_to_json("fire_evacuation_data.json", start_location="R1") |
|
|
|
|
|
|
|
|
if USE_UNSLOTH: |
|
|
rag = FireEvacuationRAG( |
|
|
model_name=UNSLOTH_MODEL, |
|
|
model_dir=MODEL_DIR, |
|
|
use_unsloth=True, |
|
|
load_in_4bit=False, |
|
|
max_seq_length=2048, |
|
|
reasoning_mode=ReasoningMode.CHAIN_OF_THOUGHT, |
|
|
decoding_strategy=DecodingStrategy.NUCLEUS, |
|
|
) |
|
|
else: |
|
|
rag = FireEvacuationRAG( |
|
|
model_name="nvidia/Llama-3.1-Minitron-4B-Width-Base", |
|
|
model_dir=MODEL_DIR, |
|
|
use_8bit=USE_8BIT, |
|
|
reasoning_mode=ReasoningMode.CHAIN_OF_THOUGHT, |
|
|
decoding_strategy=DecodingStrategy.NUCLEUS, |
|
|
) |
|
|
|
|
|
rag.download_model() |
|
|
rag.load_embedder() |
|
|
rag.build_index_from_json(json_data) |
|
|
|
|
|
_rag_instance = rag |
|
|
return rag |
|
|
|
|
|
|
|
|
def gradio_answer(question: str) -> str: |
|
|
"""Gradio callback: take a text question, return LLM/RAG answer.""" |
|
|
question = (question or "").strip() |
|
|
if not question: |
|
|
return "Please enter a question about fire evacuation or building safety." |
|
|
|
|
|
rag = _init_rag() |
|
|
result = rag.query(question, k=3, show_reasoning=False) |
|
|
return result.get("answer", "No answer generated.") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
iface = gr.Interface( |
|
|
fn=gradio_answer, |
|
|
inputs=gr.Textbox(lines=3, label="Fire Evacuation Question"), |
|
|
outputs=gr.Textbox(lines=6, label="LLM Recommendation"), |
|
|
title="Fire Evacuation RAG Advisor", |
|
|
description="Ask about evacuation routes, dangers, and exits in the simulated building.", |
|
|
) |
|
|
iface.launch() |
|
|
|
|
|
|