# service.py import asyncio from dataclasses import dataclass from typing import Any, Dict, Optional, Callable, Awaitable from config import settings from models import LLMServiceObj, ResultObj from rabbit_repo import RabbitRepo from runners.base import ILLMRunner @dataclass class _Session: Runner: Optional[ILLMRunner] FullSessionId: str class LLMService: """ Python/Gradio equivalent of your .NET LLMService. Keeps identical field names and queue semantics when talking to RabbitMQ. """ def __init__( self, publisher: RabbitRepo, runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]], ): self._pub: RabbitRepo = publisher self._runner_factory = runner_factory # async factory: dict -> ILLMRunner self._sessions: Dict[str, _Session] = {} self._ready = asyncio.Event() self._ready.set() # call clear()/set() if you preload history self._service_id_lc = settings.SERVICE_ID.lower() async def init(self) -> None: """Hook to preload history/sessions; call self._ready.set() when finished.""" pass # ---------------------------- helpers ---------------------------- def _to_model(self, data: Any) -> LLMServiceObj: """Accepts LLMServiceObj or dict and returns a validated LLMServiceObj.""" if isinstance(data, LLMServiceObj): return data if isinstance(data, dict): return LLMServiceObj(**data) raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).") async def _emit_result( self, obj: LLMServiceObj | Dict[str, Any], message: str, success: bool, queue: str, *, check_system: bool = False, include_llm_message: bool = True, ) -> None: """ Build a ResultObj-style message on the wire, mirroring your .NET usage. check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule). """ llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj) llm.ResultMessage = message llm.ResultSuccess = success if include_llm_message: llm.LlmMessage = f"{message}" if success else f"{message}" if check_system and llm.IsSystemLlm: return # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET await self._pub.publish(queue, llm) def _session_for(self, session_id: str) -> Optional[_Session]: return self._sessions.get(session_id) # ---------------------------- API methods ---------------------------- async def StartProcess(self, payload: Any) -> None: llm = self._to_model(payload) # Validate critical fields if not llm.RequestSessionId: await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage") return if not llm.LLMRunnerType: await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage") return # Construct session id like C#: RequestSessionId + "_" + LLMRunnerType session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}" llm.SessionId = session_id # Wait ready (max 120s) exactly like the C# logic try: await asyncio.wait_for(self._ready.wait(), timeout=120) except asyncio.TimeoutError: await self._emit_result( llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True ) return sess = self._session_for(session_id) runner = sess.Runner if sess else None create_new = (runner is None) or getattr(runner, "IsStateFailed", False) if create_new: # Remove previous runner if exists if runner: try: await runner.RemoveProcess(session_id) except Exception: pass # Create runner from factory (pass a plain dict for decoupling) runner = await self._runner_factory(llm.model_dump(by_alias=True)) if not runner.IsEnabled: await self._emit_result( llm, f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.", True, "llmServiceMessage", ) return await self._emit_result( llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True ) await runner.StartProcess(llm.model_dump(by_alias=True)) self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id) # Friendly greeting for your renamed service if self._service_id_lc in {"monitor", "gradllm"}: await self._emit_result( llm, f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.", True, "llmServiceMessage", check_system=True, ) # Notify "started" (full LLMServiceObj) await self._pub.publish("llmServiceStarted", llm) async def RemoveSession(self, payload: Any) -> None: llm = self._to_model(payload) base = (llm.SessionId or "").split("_")[0] if not base: await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage") return targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")] msgs: list[str] = [] ok = True for sid in targets: s = self._sessions.get(sid) if not s or not s.Runner: continue try: await s.Runner.RemoveProcess(sid) s.Runner = None msgs.append(sid) except Exception as e: ok = False msgs.append(f"Error {sid}: {e}") if ok: await self._emit_result( llm, f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}", True, "llmSessionMessage", check_system=True, ) else: await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage") async def StopRequest(self, payload: Any) -> None: llm = self._to_model(payload) sid = llm.SessionId or "" s = self._session_for(sid) if not s or not s.Runner: await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage") return await s.Runner.StopRequest(sid) await self._emit_result( llm, f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted", True, "llmServiceMessage", check_system=True, ) async def UserInput(self, payload: Any) -> None: llm = self._to_model(payload) sid = llm.SessionId or "" s = self._session_for(sid) if not s or not s.Runner: await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage") return r: ILLMRunner = s.Runner if getattr(r, "IsStateStarting", False): await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage") return if getattr(r, "IsStateFailed", False): await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage") return # Let runner push partials itself if desired; we still return a small ack await r.SendInputAndGetResponse(llm.model_dump(by_alias=True)) async def QueryIndexResult(self, payload: Any) -> None: """ Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator. Here, we forward a service message containing the same info so the UI can reflect completion. 'payload' usually has: Success, Message, QueryResults: [{Output: "..."}] """ try: data = payload if isinstance(payload, dict) else {} outputs = data.get("QueryResults") or [] rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)]) await self._pub.publish( "llmServiceMessage", ResultObj( Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data, ), ) except Exception as e: await self._pub.publish( "llmServiceMessage", ResultObj(Message=str(e), Success=False), ) async def GetFunctionRegistry(self, filtered: bool = False) -> None: """ Wire up to your real registry when ready. For now, mimic your success message payload. """ catalog = "{}" # replace with real JSON msg = f"Success : Got GetFunctionCatalogJson : {catalog}" await self._pub.publish( "llmServiceMessage", ResultObj(Message=msg, Success=True), )