# service.py import asyncio from dataclasses import dataclass from typing import Any, Dict, Optional from config import settings from models import LLMServiceObj, ResultObj from rabbit_repo import RabbitRepo from runners.base import ILLMRunner @dataclass class _Session: Runner: Optional[ILLMRunner] FullSessionId: str class LLMService: """ Python/Gradio equivalent of your .NET LLMService. Keeps identical field names and queue semantics when talking to RabbitMQ. """ def __init__(self, publisher: RabbitRepo, runner_factory): self._pub = publisher self._runner_factory = runner_factory # async factory: dict|LLMServiceObj -> ILLMRunner self._sessions: Dict[str, _Session] = {} self._ready = asyncio.Event() # If you need async load (history, etc.), call self._ready.clear() and later set self._ready.set() async def init(self): """ Hook to preload history/sessions if needed; call self._ready.set() when finished. """ # Example: # self._ready.clear() # await load_history() # self._ready.set() pass # ---------------------------- helpers ---------------------------- def _to_model(self, data: Any) -> LLMServiceObj: """ Accepts LLMServiceObj or dict and returns a validated LLMServiceObj. """ if isinstance(data, LLMServiceObj): return data if isinstance(data, dict): return LLMServiceObj(**data) # If your pipeline ever sends compressed strings here, decompress+parse first. raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).") async def _emit_result( self, obj: LLMServiceObj, message: str, success: bool, queue: str, *, check_system: bool = False, include_llm_message: bool = True ): """ Build a ResultObj-style message on the wire, mirroring your .NET usage. check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule). """ obj.ResultMessage = message obj.ResultSuccess = success if include_llm_message: obj.LlmMessage = f"{message}" if success else f"{message}" if check_system and obj.IsSystemLlm: return # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET # That lets the coordinator show the assistant text and statuses. await self._pub.publish(queue, obj) def _session_for(self, session_id: str) -> Optional[_Session]: return self._sessions.get(session_id) # ---------------------------- API methods ---------------------------- async def StartProcess(self, payload: Any): llm = self._to_model(payload) # Construct Python-side session id like C#: RequestSessionId + "_" + LLMRunnerType session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}" llm.SessionId = session_id # Wait ready (max 120s) exactly like the C# logic try: await asyncio.wait_for(self._ready.wait(), timeout=120) except asyncio.TimeoutError: await self._emit_result( llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True ) return sess = self._session_for(session_id) is_runner_null = (sess is None) or (sess.Runner is None) create_new = is_runner_null or (sess and sess.Runner and sess.Runner.IsStateFailed) if create_new: # Remove previous runner if exists if sess and sess.Runner: try: await sess.Runner.RemoveProcess(session_id) except Exception: pass # Create runner from factory runner: ILLMRunner = await self._runner_factory(llm.model_dump()) if not runner.IsEnabled: await self._emit_result( llm, f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.", True, "llmServiceMessage", ) return await self._emit_result( llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True ) await runner.StartProcess(llm.model_dump()) self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id) # Mirror your friendly greeting, gated by service id (you renamed yours to gradllm) if settings.SERVICE_ID.lower() in {"monitor", "gradllm"}: await self._emit_result( llm, f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.", True, "llmServiceMessage", check_system=True, ) # Notify "started" await self._pub.publish("llmServiceStarted", llm) async def RemoveSession(self, payload: Any): llm = self._to_model(payload) base = (llm.SessionId or "").split("_")[0] targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")] msgs = [] ok = True for sid in targets: s = self._sessions.get(sid) if not s or not s.Runner: continue try: await s.Runner.RemoveProcess(sid) s.Runner = None msgs.append(sid) except Exception as e: ok = False msgs.append(f"Error {sid}: {e}") if ok: await self._emit_result( llm, f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}", True, "llmSessionMessage", check_system=True, ) else: await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage") async def StopRequest(self, payload: Any): llm = self._to_model(payload) sid = llm.SessionId or "" s = self._session_for(sid) if not s or not s.Runner: await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage") return await s.Runner.StopRequest(sid) await self._emit_result( llm, f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted", True, "llmServiceMessage", check_system=True, ) async def UserInput(self, payload: Any): llm = self._to_model(payload) sid = llm.SessionId or "" s = self._session_for(sid) if not s or not s.Runner: await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage") return r: ILLMRunner = s.Runner if r.IsStateStarting: await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage") return if r.IsStateFailed: await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage") return # Let runner push partials itself if desired; we still return a small ack await r.SendInputAndGetResponse(llm.model_dump()) async def QueryIndexResult(self, payload: Any): """ Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator. Here, we forward a service message containing the same info so the UI can reflect completion. 'payload' usually has: Success, Message, QueryResults: [{Output: "..."}] """ try: data = payload if isinstance(payload, dict) else {} outputs = data.get("QueryResults") or [] rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)]) # Shape compatible with your coordinator expectations await self._pub.publish( "llmServiceMessage", ResultObj( Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data, ) ) except Exception as e: await self._pub.publish( "llmServiceMessage", ResultObj(Message=str(e), Success=False) ) async def GetFunctionRegistry(self, filtered: bool = False): """ Wire up to your real registry when ready. For now, mimic your success message payload. """ catalog = "{}" # replace with real JSON msg = f"Success : Got GetFunctionCatalogJson : {catalog}" await self._pub.publish( "llmServiceMessage", ResultObj(Message=msg, Success=True) )