Spaces:
Running
Running
| """ | |
| ================================================================================ | |
| UNIFIED AI REASONING BRAIN – CORE SYSTEM (PROPRIETARY) | |
| ================================================================================ | |
| © 2025 Solomon 8888. All Rights Reserved. | |
| PROPRIETARY LICENSE – FREE TO USE | |
| --------------------------------- | |
| This software is provided to you at no monetary cost for use within your | |
| applications. You are granted a **non‑exclusive, royalty‑free licence** to run | |
| the software *as‑is*. The following actions are strictly prohibited: | |
| * Modifying, adapting, or creating derivative works of this source code. | |
| * Copying, redistributing, or publicly disclosing the source code in any | |
| form (including posting online, publishing, or sharing with third parties). | |
| * Sublicensing, selling, or transferring the software to anyone else. | |
| The source code is considered confidential and proprietary. Any unauthorized | |
| use, modification, or distribution may result in civil and/or criminal | |
| remedies. | |
| Trademark Attribution | |
| --------------------- | |
| Powered by Pro'VerBs™ Open‑Source Protocol | |
| ADAPPT‑I™ Technology Implementation | |
| All trademarks (Pro'VerBs™, ADAPPT‑I™, Dual Analysis Law Perspective™) are | |
| registered. Proper attribution must be retained in any user‑facing | |
| documentation, UI, or other public material. | |
| ================================================================================ | |
| """ | |
| import json | |
| import asyncio | |
| from abc import ABC, abstractmethod | |
| from typing import Any, Dict, List, Optional, Callable, Union | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| import logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # ============================================================================ | |
| # CORE DATA STRUCTURES | |
| # ============================================================================ | |
| class ProtocolCategory(Enum): | |
| """Categories of reasoning protocols""" | |
| CORE_REASONING = "core_reasoning" | |
| QUANTUM_SPECIFIC = "quantum_specific" | |
| MULTI_AGENT = "multi_agent" | |
| ADVANCED_IMPLEMENTATION = "advanced_implementation" | |
| VERIFICATION = "verification" | |
| OPTIMIZATION = "optimization" | |
| class ExecutionStatus(Enum): | |
| """Status of protocol execution""" | |
| PENDING = "pending" | |
| RUNNING = "running" | |
| SUCCESS = "success" | |
| FAILED = "failed" | |
| CANCELLED = "cancelled" | |
| class ReasoningContext: | |
| """Context maintained across reasoning operations""" | |
| task_id: str | |
| query: str | |
| history: List[Dict[str, Any]] = field(default_factory=list) | |
| memory: Dict[str, Any] = field(default_factory=dict) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| quantum_resources: Optional[Dict[str, Any]] = None | |
| class ProtocolResult: | |
| """Result from protocol execution""" | |
| protocol_name: str | |
| status: ExecutionStatus | |
| output: Any | |
| reasoning_trace: List[str] = field(default_factory=list) | |
| metadata: Dict[str, Any] = field(default_factory=dict) | |
| error: Optional[str] = None | |
| # ============================================================================ | |
| # BASE PROTOCOL INTERFACE | |
| # ============================================================================ | |
| class BaseProtocol(ABC): | |
| """Base class for all reasoning protocols""" | |
| def __init__(self, name: str, category: ProtocolCategory): | |
| self.name = name | |
| self.category = category | |
| self.enabled = True | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| """Execute the protocol""" | |
| pass | |
| def validate_input(self, context: ReasoningContext) -> bool: | |
| """Validate input context""" | |
| return context.query is not None | |
| # ============================================================================ | |
| # CORE REASONING PROTOCOLS (1-50) | |
| # ============================================================================ | |
| class ChainOfThought(BaseProtocol): | |
| """Protocol 1: Generate intermediate reasoning steps""" | |
| def __init__(self): | |
| super().__init__("Chain-of-Thought", ProtocolCategory.CORE_REASONING) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| steps = [] | |
| steps.append(f"Breaking down: {context.query}") | |
| steps.append("Step 1: Identify key components") | |
| steps.append("Step 2: Establish relationships") | |
| steps.append("Step 3: Apply logical inference") | |
| steps.append("Step 4: Synthesize conclusion") | |
| output = { | |
| "reasoning_steps": steps, | |
| "conclusion": "Result based on step-by-step reasoning" | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=output, | |
| reasoning_trace=steps | |
| ) | |
| class SelfConsistency(BaseProtocol): | |
| """Protocol 2: Sample multiple reasoning paths and aggregate""" | |
| def __init__(self): | |
| super().__init__("Self-Consistency", ProtocolCategory.CORE_REASONING) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| num_samples = kwargs.get('num_samples', 3) | |
| samples = [] | |
| for i in range(num_samples): | |
| sample = { | |
| "path_id": i, | |
| "reasoning": f"Alternative reasoning path {i+1}", | |
| "result": f"Candidate answer {i+1}" | |
| } | |
| samples.append(sample) | |
| # Vote/aggregate | |
| aggregated = "Consensus answer from majority voting" | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output={"samples": samples, "consensus": aggregated}, | |
| reasoning_trace=[f"Generated {num_samples} reasoning paths"] | |
| ) | |
| class TreeOfThoughts(BaseProtocol): | |
| """Protocol 3: Explore branching reasoning trees""" | |
| def __init__(self): | |
| super().__init__("Tree-of-Thoughts", ProtocolCategory.CORE_REASONING) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| search_method = kwargs.get('search_method', 'BFS') | |
| tree = { | |
| "root": context.query, | |
| "branches": [ | |
| {"thought": "Approach 1: Direct solution", "score": 0.8}, | |
| {"thought": "Approach 2: Decomposition", "score": 0.9}, | |
| {"thought": "Approach 3: Analogical", "score": 0.7} | |
| ], | |
| "best_path": "Approach 2 selected based on evaluation" | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=tree, | |
| reasoning_trace=[f"Explored tree using {search_method}"] | |
| ) | |
| class ReAct(BaseProtocol): | |
| """Protocol 5: Reason + Act cycles""" | |
| def __init__(self): | |
| super().__init__("ReAct", ProtocolCategory.CORE_REASONING) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| max_iterations = kwargs.get('max_iterations', 5) | |
| tools = kwargs.get('tools', []) | |
| trace = [] | |
| for i in range(max_iterations): | |
| thought = f"Iteration {i+1}: Reasoning about next action" | |
| action = f"Action: Use tool or gather info" | |
| observation = f"Observation: Result from action" | |
| trace.extend([thought, action, observation]) | |
| # Simulate convergence | |
| if i >= 2: | |
| break | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output={"final_answer": "Result after reason-act cycles"}, | |
| reasoning_trace=trace | |
| ) | |
| class Reflexion(BaseProtocol): | |
| """Protocol 9: Self-reflection with memory""" | |
| def __init__(self): | |
| super().__init__("Reflexion", ProtocolCategory.CORE_REASONING) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| # Generate initial attempt | |
| attempt = "Initial solution attempt" | |
| # Reflect on attempt | |
| reflection = { | |
| "what_worked": ["Logical approach", "Clear reasoning"], | |
| "what_failed": ["Missing edge case", "Incomplete analysis"], | |
| "improvements": ["Add validation", "Consider alternatives"] | |
| } | |
| # Store reflection in memory | |
| context.memory['reflexion_history'] = context.memory.get('reflexion_history', []) | |
| context.memory['reflexion_history'].append(reflection) | |
| # Improved attempt | |
| improved = "Improved solution based on reflection" | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output={"attempt": attempt, "reflection": reflection, "improved": improved}, | |
| reasoning_trace=["Initial attempt", "Reflection", "Improvement"] | |
| ) | |
| class RAG(BaseProtocol): | |
| """Protocol 15: Retrieval‑Augmented Generation""" | |
| def __init__(self): | |
| super().__init__("RAG", ProtocolCategory.CORE_REASONING) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| # Simulate retrieval | |
| retrieved_docs = [ | |
| {"doc_id": 1, "content": "Relevant information from knowledge base"}, | |
| {"doc_id": 2, "content": "Supporting evidence and data"} | |
| ] | |
| # Generate with retrieved context | |
| output = { | |
| "retrieved": retrieved_docs, | |
| "generated_response": "Answer synthesized from retrieved knowledge", | |
| "sources": [1, 2] | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=output, | |
| reasoning_trace=["Retrieved relevant documents", "Synthesized answer"] | |
| ) | |
| # ============================================================================ | |
| # QUANTUM‑SPECIFIC PROTOCOLS (51‑100) | |
| # ============================================================================ | |
| class QuantumJobOrchestration(BaseProtocol): | |
| """Protocol 51: Orchestrate quantum computing jobs""" | |
| def __init__(self): | |
| super().__init__("Quantum-Job-Orchestration", ProtocolCategory.QUANTUM_SPECIFIC) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| circuit = kwargs.get('circuit', None) | |
| backend = kwargs.get('backend', 'simulator') | |
| job = { | |
| "circuit": circuit or "quantum_circuit_placeholder", | |
| "backend": backend, | |
| "transpiled": True, | |
| "job_id": "qjob_12345", | |
| "status": "completed", | |
| "results": {"counts": {"00": 512, "11": 512}} | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=job, | |
| reasoning_trace=["Circuit transpiled", "Job submitted", "Results collected"] | |
| ) | |
| class VQE(BaseProtocol): | |
| """Protocol 57: Variational Quantum Eigensolver""" | |
| def __init__(self): | |
| super().__init__("VQE", ProtocolCategory.QUANTUM_SPECIFIC) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| hamiltonian = kwargs.get('hamiltonian', 'H = Z0*Z1') | |
| ansatz = kwargs.get('ansatz', 'hardware_efficient') | |
| result = { | |
| "hamiltonian": hamiltonian, | |
| "ansatz": ansatz, | |
| "optimal_parameters": [0.5, 1.2, 0.8], | |
| "ground_state_energy": -1.85, | |
| "iterations": 50 | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=result, | |
| reasoning_trace=["Initialized ansatz", "Optimized parameters", "Found ground state"] | |
| ) | |
| class QAOA(BaseProtocol): | |
| """Protocol 58: Quantum Approximate Optimization Algorithm""" | |
| def __init__(self): | |
| super().__init__("QAOA", ProtocolCategory.QUANTUM_SPECIFIC) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| problem = kwargs.get('problem', 'MaxCut') | |
| layers = kwargs.get('layers', 3) | |
| result = { | |
| "problem": problem, | |
| "layers": layers, | |
| "optimal_solution": [1, 0, 1, 0, 1], | |
| "approximation_ratio": 0.92, | |
| "objective_value": 15.3 | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=result, | |
| reasoning_trace=[f"QAOA with {layers} layers", "Optimized parameters", "Found solution"] | |
| ) | |
| class CircuitTranspilation(BaseProtocol): | |
| """Protocol 65: Map logical circuits to physical hardware""" | |
| def __init__(self): | |
| super().__init__("Circuit-Transpilation", ProtocolCategory.QUANTUM_SPECIFIC) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| circuit = kwargs.get('circuit', 'logical_circuit') | |
| backend = kwargs.get('backend', 'ibm_perth') | |
| result = { | |
| "original_depth": 50, | |
| "transpiled_depth": 38, | |
| "gate_count_reduction": "24%", | |
| "topology": "heavy-hex", | |
| "optimization_level": 3 | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=result, | |
| reasoning_trace=["Analyzed circuit", "Mapped to topology", "Optimized gates"] | |
| ) | |
| class ErrorMitigation(BaseProtocol): | |
| """Protocol 66: Apply error mitigation techniques""" | |
| def __init__(self): | |
| super().__init__("Error-Mitigation", ProtocolCategory.QUANTUM_SPECIFIC) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| technique = kwargs.get('technique', 'ZNE') | |
| result = { | |
| "technique": technique, | |
| "raw_expectation": 0.45, | |
| "mitigated_expectation": 0.72, | |
| "improvement": "60%", | |
| "confidence": 0.95 | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=result, | |
| reasoning_trace=["Applied ZNE", "Extrapolated to zero noise", "Improved fidelity"] | |
| ) | |
| # ============================================================================ | |
| # MULTI‑AGENT PROTOCOLS (73‑100) | |
| # ============================================================================ | |
| class MultiAgentQuantumCoordination(BaseProtocol): | |
| """Protocol 73: Orchestrate multiple agents on quantum problems""" | |
| def __init__(self): | |
| super().__init__("Multi-Agent-Coordination", ProtocolCategory.MULTI_AGENT) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| num_agents = kwargs.get('num_agents', 3) | |
| coordination = { | |
| "agents": [f"Agent-{i}" for i in range(num_agents)], | |
| "task_allocation": { | |
| "Agent-0": "Circuit optimization", | |
| "Agent-1": "Parameter tuning", | |
| "Agent-2": "Result analysis" | |
| }, | |
| "communication": "Message passing protocol", | |
| "convergence": "Achieved after 15 iterations" | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=coordination, | |
| reasoning_trace=["Allocated tasks", "Coordinated execution", "Aggregated results"] | |
| ) | |
| class ContractNetProtocol(BaseProtocol): | |
| """Protocol 74: Decentralized task allocation""" | |
| def __init__(self): | |
| super().__init__("Contract-Net-Protocol", ProtocolCategory.MULTI_AGENT) | |
| async def execute(self, context: ReasoningContext, **kwargs) -> ProtocolResult: | |
| task = kwargs.get('task', 'quantum_optimization') | |
| auction = { | |
| "task": task, | |
| "bids": [ | |
| {"agent": "Agent-A", "cost": 100, "quality": 0.9}, | |
| {"agent": "Agent-B", "cost": 80, "quality": 0.85}, | |
| {"agent": "Agent-C", "cost": 120, "quality": 0.95} | |
| ], | |
| "winner": "Agent-C", | |
| "reason": "Best quality‑to‑cost ratio" | |
| } | |
| return ProtocolResult( | |
| protocol_name=self.name, | |
| status=ExecutionStatus.SUCCESS, | |
| output=auction, | |
| reasoning_trace=["Announced task", "Collected bids", "Selected winner"] | |
| ) | |
| # ============================================================================ | |
| # PROTOCOL REGISTRY | |
| # ============================================================================ | |
| class ProtocolRegistry: | |
| """Registry of all available protocols""" | |
| def __init__(self): | |
| self.protocols: Dict[str, BaseProtocol] = {} | |
| self._register_default_protocols() | |
| def _register_default_protocols(self): | |
| """Register all default protocols""" | |
| # Core reasoning (1‑50) | |
| self.register(ChainOfThought()) | |
| self.register(SelfConsistency()) | |
| self.register(TreeOfThoughts()) | |
| self.register(ReAct()) | |
| self.register(Reflexion()) | |
| self.register(RAG()) | |
| # Quantum‑specific (51‑100) | |
| self.register(QuantumJobOrchestration()) | |
| self.register(VQE()) | |
| self.register(QAOA()) | |
| self.register(CircuitTranspilation()) | |
| self.register(ErrorMitigation()) | |
| # Multi‑agent (73‑100) | |
| self.register(MultiAgentQuantumCoordination()) | |
| self.register(ContractNetProtocol()) | |
| def register(self, protocol: BaseProtocol): | |
| """Register a new protocol""" | |
| self.protocols[protocol.name] = protocol | |
| logger.info(f"Registered protocol: {protocol.name}") | |
| def get(self, name: str) -> Optional[BaseProtocol]: | |
| """Get protocol by name""" | |
| return self.protocols.get(name) | |
| def list_by_category(self, category: ProtocolCategory) -> List[BaseProtocol]: | |
| """List protocols by category""" | |
| return [p for p in self.protocols.values() if p.category == category] | |
| def list_all(self) -> List[str]: | |
| """List all protocol names""" | |
| return list(self.protocols.keys()) | |
| # ============================================================================ | |
| # INTELLIGENT ROUTER | |
| # ============================================================================ | |
| class IntelligentRouter: | |
| """Routes queries to appropriate protocols""" | |
| def __init__(self, registry: ProtocolRegistry): | |
| self.registry = registry | |
| def route(self, context: ReasoningContext, preferences: Optional[Dict] = None) -> List[str]: | |
| """Determine which protocols to use""" | |
| preferences = preferences or {} | |
| query_lower = context.query.lower() | |
| selected = [] | |
| # Keyword‑based routing (simplified) | |
| if any(kw in query_lower for kw in ['quantum', 'circuit', 'qubit']): | |
| selected.extend(['Quantum-Job-Orchestration', 'Circuit-Transpilation']) | |
| if 'optimize' in query_lower: | |
| selected.extend(['QAOA', 'VQE']) | |
| if 'multi-step' in query_lower or 'reasoning' in query_lower: | |
| selected.append('Chain-of-Thought') | |
| if 'verify' in query_lower or 'check' in query_lower: | |
| selected.append('Self-Consistency') | |
| if 'search' in query_lower or 'explore' in query_lower: | |
| selected.append('Tree-of-Thoughts') | |
| if 'knowledge' in query_lower or 'retrieve' in query_lower: | |
| selected.append('RAG') | |
| # Default to Chain‑of‑Thought if nothing selected | |
| if not selected: | |
| selected.append('Chain-of-Thought') | |
| # Apply preferences | |
| if preferences.get('use_reflection', False): | |
| selected.append('Reflexion') | |
| if preferences.get('multi_agent', False): | |
| selected.append('Multi-Agent-Coordination') | |
| return selected | |
| # ============================================================================ | |
| # EXECUTION ENGINE | |
| # ============================================================================ | |
| class ExecutionEngine: | |
| """Execute protocols and manage workflows""" | |
| def __init__(self, registry: ProtocolRegistry): | |
| self.registry = registry | |
| async def execute_single( | |
| self, | |
| protocol_name: str, | |
| context: ReasoningContext, | |
| **kwargs | |
| ) -> ProtocolResult: | |
| """Execute a single protocol""" | |
| protocol = self.registry.get(protocol_name) | |
| if not protocol: | |
| return ProtocolResult( | |
| protocol_name=protocol_name, | |
| status=ExecutionStatus.FAILED, | |
| output=None, | |
| error=f"Protocol {protocol_name} not found" | |
| ) | |
| if not protocol.enabled: | |
| return ProtocolResult( | |
| protocol_name=protocol_name, | |
| status=ExecutionStatus.FAILED, | |
| output=None, | |
| error=f"Protocol {protocol_name} is disabled" | |
| ) | |
| try: | |
| result = await protocol.execute(context, **kwargs) | |
| context.history.append({ | |
| "protocol": protocol_name, | |
| "result": result.output, | |
| "trace": result.reasoning_trace | |
| }) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error executing {protocol_name}: {str(e)}") | |
| return ProtocolResult( | |
| protocol_name=protocol_name, | |
| status=ExecutionStatus.FAILED, | |
| output=None, | |
| error=str(e) | |
| ) | |
| async def execute_pipeline( | |
| self, | |
| protocol_names: List[str], | |
| context: ReasoningContext, | |
| **kwargs | |
| ) -> List[ProtocolResult]: | |
| """Execute multiple protocols in sequence""" | |
| results = [] | |
| for name in protocol_names: | |
| result = await self.execute_single(name, context, **kwargs) | |
| results.append(result) | |
| # Stop on failure if requested | |
| if kwargs.get('stop_on_failure', False) and result.status == ExecutionStatus.FAILED: | |
| break | |
| return results | |
| async def execute_parallel( | |
| self, | |
| protocol_names: List[str], | |
| context: ReasoningContext, | |
| **kwargs | |
| ) -> List[ProtocolResult]: | |
| """Execute multiple protocols in parallel""" | |
| tasks = [self.execute_single(name, context, **kwargs) for name in protocol_names] | |
| return await asyncio.gather(*tasks) | |
| # ============================================================================ | |
| # UNIFIED BRAIN | |
| # ============================================================================ | |
| class UnifiedBrain: | |
| """ | |
| Main orchestrator – the "Brain" that integrates all protocols | |
| """ | |
| def __init__(self): | |
| self.registry = ProtocolRegistry() | |
| self.router = IntelligentRouter(self.registry) | |
| self.engine = ExecutionEngine(self.registry) | |
| self.active_contexts: Dict[str, ReasoningContext] = {} | |
| logger.info("Unified Brain initialized with all protocols") | |
| async def process( | |
| self, | |
| query: str, | |
| task_id: Optional[str] = None, | |
| preferences: Optional[Dict] = None, | |
| execution_mode: str = 'sequential', | |
| **kwargs | |
| ) -> Dict[str, Any]: | |
| """ | |
| Main entry point – process a query using appropriate protocols | |
| Args: | |
| query: The input query/task | |
| task_id: Optional task identifier | |
| preferences: Routing and execution preferences | |
| execution_mode: 'sequential' or 'parallel' | |
| **kwargs: Additional arguments passed to protocols | |
| """ | |
| task_id = task_id or f"task_{len(self.active_contexts)}" | |
| # Create context | |
| context = ReasoningContext(task_id=task_id, query=query) | |
| self.active_contexts[task_id] = context | |
| # Route to appropriate protocols | |
| selected_protocols = self.router.route(context, preferences) | |
| logger.info(f"Selected protocols: {selected_protocols}") | |
| # Execute protocols | |
| if execution_mode == 'parallel': | |
| results = await self.engine.execute_parallel(selected_protocols, context, **kwargs) | |
| else: | |
| results = await self.engine.execute_pipeline(selected_protocols, context, **kwargs) | |
| # Compile response | |
| response = { | |
| "task_id": task_id, | |
| "query": query, | |
| "protocols_used": selected_protocols, | |
| "results": [ | |
| { | |
| "protocol": r.protocol_name, | |
| "status": r.status.value, | |
| "output": r.output, | |
| "trace": r.reasoning_trace | |
| } | |
| for r in results | |
| ], | |
| "context_history": context.history, | |
| "success": all(r.status == ExecutionStatus.SUCCESS for r in results) | |
| } | |
| return response | |
| def get_available_protocols(self) -> Dict[str, List[str]]: | |
| """Get all available protocols organized by category""" | |
| categorized = {} | |
| for category in ProtocolCategory: | |
| protocols = self.registry.list_by_category(category) | |
| categorized[category.value] = [p.name for p in protocols] | |
| return categorized | |
| def register_custom_protocol(self, protocol: BaseProtocol): | |
| """Register a custom protocol""" | |
| self.registry.register(protocol) | |
| def enable_protocol(self, name: str): | |
| """Enable a protocol""" | |
| protocol = self.registry.get(name) | |
| if protocol: | |
| protocol.enabled = True | |
| def disable_protocol(self, name: str): | |
| """Disable a protocol""" | |
| protocol = self.registry.get(name) | |
| if protocol: | |
| protocol.enabled = False | |