""" Pathfinding and Risk Assessment Module Uses A* algorithm with custom risk-based heuristics """ import heapq from typing import Dict, List, Tuple, Optional from .floor_plan import FloorPlan from .sensor_system import SensorSystem, SensorReading class PathNode: """Node for pathfinding algorithm""" def __init__(self, room_id: str, g_cost: float, h_cost: float, parent=None): self.room_id = room_id self.g_cost = g_cost # Cost from start self.h_cost = h_cost # Heuristic cost to goal self.f_cost = g_cost + h_cost # Total cost self.parent = parent def __lt__(self, other): return self.f_cost < other.f_cost def __eq__(self, other): return self.room_id == other.room_id class RiskAssessment: """Assesses risk for different evacuation routes""" @staticmethod def calculate_path_risk(path: List[str], sensor_system: SensorSystem, floor_plan: FloorPlan) -> Dict: """ Calculate comprehensive risk assessment for a path Returns dict with: - total_danger: Overall danger score - max_danger: Maximum danger point - avg_danger: Average danger - has_fire: Whether path goes through fire - has_oxygen_hazard: Whether path has oxygen cylinder - passable: Whether path is passable - risk_factors: List of specific risks """ if not path: return {"passable": False, "total_danger": 100.0} danger_scores = [] risk_factors = [] has_fire = False has_oxygen_hazard = False max_danger_location = None max_danger_score = 0.0 for room_id in path: sensor = sensor_system.get_sensor_reading(room_id) room = floor_plan.get_room(room_id) if sensor: danger = sensor.calculate_danger_score() danger_scores.append(danger) if danger > max_danger_score: max_danger_score = danger max_danger_location = room_id # Check specific hazards if sensor.fire_detected: has_fire = True risk_factors.append(f"Fire detected in {room_id}") if room and room.has_oxygen_cylinder: has_oxygen_hazard = True # Increase danger if there's heat near oxygen if sensor.temperature > 40 or sensor.fire_detected: danger_scores[-1] += 15 # Add explosion risk risk_factors.append(f"Oxygen cylinder explosion risk in {room_id}") else: risk_factors.append(f"Oxygen cylinder present in {room_id}") if sensor.smoke_level > 0.5: risk_factors.append(f"Heavy smoke in {room_id}") if sensor.temperature > 60: risk_factors.append(f"High temperature ({sensor.temperature:.1f}°C) in {room_id}") if sensor.oxygen_level < 19.5: risk_factors.append(f"Low oxygen ({sensor.oxygen_level:.1f}%) in {room_id}") # NEW: Toxic gas warnings if sensor.carbon_monoxide > 50: risk_factors.append(f"TOXIC CO ({sensor.carbon_monoxide:.0f} ppm) in {room_id}") if sensor.carbon_dioxide > 5000: risk_factors.append(f"High CO2 ({sensor.carbon_dioxide:.0f} ppm) in {room_id}") if sensor.hydrogen_cyanide > 20: risk_factors.append(f"TOXIC HCN ({sensor.hydrogen_cyanide:.1f} ppm) in {room_id}") # NEW: Flashover/backdraft warnings if sensor.flashover_risk > 0.7: risk_factors.append(f"CRITICAL: Flashover risk ({sensor.flashover_risk*100:.0f}%) in {room_id}") if sensor.backdraft_risk > 0.6: risk_factors.append(f"CRITICAL: Backdraft risk ({sensor.backdraft_risk*100:.0f}%) in {room_id}") # NEW: Crowd density warnings if sensor.occupancy_density > 0.8: risk_factors.append(f"High crowd density ({sensor.occupancy_density*100:.0f}%) in {room_id}") # NEW: Infrastructure failures if not sensor.exit_accessible: risk_factors.append(f"Exit BLOCKED in {room_id}") if not sensor.stairwell_clear: risk_factors.append(f"Stairwell BLOCKED in {room_id}") if not sensor.emergency_lighting: risk_factors.append(f"No emergency lighting in {room_id}") # NEW: Time pressure if sensor.time_since_fire_start > 300: risk_factors.append(f"Time pressure: {sensor.time_since_fire_start//60} min elapsed") if not sensor.is_passable(): risk_factors.append(f"Path blocked at {room_id}") total_danger = sum(danger_scores) avg_danger = total_danger / len(danger_scores) if danger_scores else 0 # Check if all segments are passable passable = all(sensor_system.get_sensor_reading(rid).is_passable() for rid in path if sensor_system.get_sensor_reading(rid)) return { "total_danger": total_danger, "max_danger": max_danger_score, "max_danger_location": max_danger_location, "avg_danger": avg_danger, "path_length": len(path), "has_fire": has_fire, "has_oxygen_hazard": has_oxygen_hazard, "passable": passable, "risk_factors": risk_factors, "danger_scores": danger_scores } @staticmethod def get_risk_level(avg_danger: float) -> str: """Get risk level description""" if avg_danger < 20: return "LOW" elif avg_danger < 40: return "MODERATE" elif avg_danger < 60: return "HIGH" else: return "CRITICAL" @staticmethod def recommend_path(paths: List[Tuple[List[str], Dict]]) -> Optional[Tuple[List[str], Dict]]: """ Recommend the best path based on risk assessment Args: paths: List of (path, risk_assessment) tuples Returns: Best (path, risk_assessment) tuple or None """ if not paths: return None # Filter to only passable paths passable_paths = [(p, r) for p, r in paths if r["passable"]] if not passable_paths: # No fully passable paths, return least dangerous return min(paths, key=lambda x: x[1]["total_danger"]) # Score each path (lower is better) def score_path(path_info): path, risk = path_info score = 0 # Heavily penalize fire if risk["has_fire"]: score += 100 # Add oxygen hazard risk (but less than fire) if risk["has_oxygen_hazard"]: score += 30 # Add danger scores score += risk["total_danger"] # Prefer shorter paths (slight preference) score += risk["path_length"] * 2 # Penalize high max danger points score += risk["max_danger"] * 0.5 return score # Return path with lowest score return min(passable_paths, key=score_path) class PathFinder: """Find optimal evacuation paths using A* algorithm with risk assessment""" def __init__(self, floor_plan: FloorPlan, sensor_system: SensorSystem): self.floor_plan = floor_plan self.sensor_system = sensor_system def find_all_evacuation_routes(self, start: str) -> List[Tuple[str, List[str], Dict]]: """ Find evacuation routes to all exits Returns: List of (exit_id, path, risk_assessment) tuples """ routes = [] exits = self.floor_plan.get_all_exits() for exit_id in exits: path = self.find_path(start, exit_id) if path: risk = RiskAssessment.calculate_path_risk( path, self.sensor_system, self.floor_plan ) routes.append((exit_id, path, risk)) return routes def find_path(self, start: str, goal: str) -> Optional[List[str]]: """ Find path from start to goal using A* with risk-based costs Returns: List of room IDs representing the path, or None if no path exists """ if start not in self.floor_plan.rooms or goal not in self.floor_plan.rooms: return None # Priority queue: (f_cost, node) open_set = [] closed_set = set() # Initialize start node start_node = PathNode(start, 0, self._heuristic(start, goal)) heapq.heappush(open_set, start_node) # Track best g_cost for each room g_costs = {start: 0} while open_set: current = heapq.heappop(open_set) # Goal reached if current.room_id == goal: return self._reconstruct_path(current) # Skip if already processed if current.room_id in closed_set: continue closed_set.add(current.room_id) # Explore neighbors neighbors = self.floor_plan.get_neighbors(current.room_id) for neighbor_id, base_distance in neighbors: if neighbor_id in closed_set: continue # Calculate risk-adjusted cost risk_cost = self._calculate_risk_cost(neighbor_id) tentative_g = current.g_cost + base_distance + risk_cost # If this path is better, add to open set if neighbor_id not in g_costs or tentative_g < g_costs[neighbor_id]: g_costs[neighbor_id] = tentative_g h_cost = self._heuristic(neighbor_id, goal) neighbor_node = PathNode(neighbor_id, tentative_g, h_cost, current) heapq.heappush(open_set, neighbor_node) # No path found return None def _heuristic(self, room_id1: str, room_id2: str) -> float: """ Heuristic function for A* (Manhattan distance between room positions) """ room1 = self.floor_plan.get_room(room_id1) room2 = self.floor_plan.get_room(room_id2) if not room1 or not room2: return 0 x1, y1 = room1.position x2, y2 = room2.position return abs(x2 - x1) + abs(y2 - y1) def _calculate_risk_cost(self, room_id: str) -> float: """ Calculate risk-based cost for traversing a room with all real-world factors Higher danger = higher cost """ sensor = self.sensor_system.get_sensor_reading(room_id) if not sensor: return 0.0 danger_score = sensor.calculate_danger_score() # Convert danger score to cost multiplier # Danger 0-20: minimal cost # Danger 20-50: moderate cost # Danger 50+: high cost risk_cost = danger_score / 10.0 # Extra penalty for fire if sensor.fire_detected: risk_cost += 20.0 # Extra penalty for oxygen cylinder in dangerous conditions room = self.floor_plan.get_room(room_id) if room and room.has_oxygen_cylinder: if sensor.temperature > 40 or sensor.fire_detected: risk_cost += 10.0 # NEW: Extra penalties for critical factors # Toxic gas penalty if sensor.carbon_monoxide > 50: risk_cost += 15.0 if sensor.carbon_dioxide > 5000: risk_cost += 10.0 # Flashover risk penalty if sensor.flashover_risk > 0.7: risk_cost += 25.0 elif sensor.flashover_risk > 0.5: risk_cost += 15.0 # Exit blockage penalty if not sensor.exit_accessible: risk_cost += 30.0 if not sensor.stairwell_clear: risk_cost += 20.0 # Crowd density penalty (slows movement) if sensor.occupancy_density > 0.8: risk_cost += sensor.occupancy_density * 20.0 # Infrastructure failure penalty if not sensor.emergency_lighting: risk_cost += 5.0 # Make impassable areas very expensive (but not infinite) if not sensor.is_passable(): risk_cost += 50.0 return risk_cost def _reconstruct_path(self, node: PathNode) -> List[str]: """Reconstruct path from goal node back to start""" path = [] current = node while current: path.append(current.room_id) current = current.parent return list(reversed(path))