# service.py
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, Optional, Callable, Awaitable
from config import settings
from models import LLMServiceObj, ResultObj
from rabbit_repo import RabbitRepo
from runners.base import ILLMRunner
@dataclass
class _Session:
Runner: Optional[ILLMRunner]
FullSessionId: str
class LLMService:
"""
Python/Gradio equivalent of your .NET LLMService.
Keeps identical field names and queue semantics when talking to RabbitMQ.
"""
def __init__(
self,
publisher: RabbitRepo,
runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]],
):
self._pub: RabbitRepo = publisher
self._runner_factory = runner_factory # async factory: dict -> ILLMRunner
self._sessions: Dict[str, _Session] = {}
self._ready = asyncio.Event()
self._ready.set() # call clear()/set() if you preload history
self._service_id_lc = settings.SERVICE_ID.lower()
async def init(self) -> None:
"""Hook to preload history/sessions; call self._ready.set() when finished."""
pass
# ---------------------------- helpers ----------------------------
def _to_model(self, data: Any) -> LLMServiceObj:
"""Accepts LLMServiceObj or dict and returns a validated LLMServiceObj."""
if isinstance(data, LLMServiceObj):
return data
if isinstance(data, dict):
return LLMServiceObj(**data)
raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).")
async def _emit_result(
self,
obj: LLMServiceObj | Dict[str, Any],
message: str,
success: bool,
queue: str,
*,
check_system: bool = False,
include_llm_message: bool = True,
) -> None:
"""
Build a ResultObj-style message on the wire, mirroring your .NET usage.
check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
"""
llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj)
llm.ResultMessage = message
llm.ResultSuccess = success
if include_llm_message:
llm.LlmMessage = f"{message}" if success else f"{message}"
if check_system and llm.IsSystemLlm:
return
# You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
await self._pub.publish(queue, llm)
def _session_for(self, session_id: str) -> Optional[_Session]:
return self._sessions.get(session_id)
# ---------------------------- API methods ----------------------------
async def StartProcess(self, payload: Any) -> None:
llm = self._to_model(payload)
# Validate critical fields
if not llm.RequestSessionId:
await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage")
return
if not llm.LLMRunnerType:
await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage")
return
# Construct session id like C#: RequestSessionId + "_" + LLMRunnerType
session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
llm.SessionId = session_id
# Wait ready (max 120s) exactly like the C# logic
try:
await asyncio.wait_for(self._ready.wait(), timeout=120)
except asyncio.TimeoutError:
await self._emit_result(
llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True
)
return
sess = self._session_for(session_id)
runner = sess.Runner if sess else None
create_new = (runner is None) or getattr(runner, "IsStateFailed", False)
if create_new:
# Remove previous runner if exists
if runner:
try:
await runner.RemoveProcess(session_id)
except Exception:
pass
# Create runner from factory (pass a plain dict for decoupling)
runner = await self._runner_factory(llm.model_dump(by_alias=True))
if not runner.IsEnabled:
await self._emit_result(
llm,
f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.",
True,
"llmServiceMessage",
)
return
await self._emit_result(
llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True
)
await runner.StartProcess(llm.model_dump(by_alias=True))
self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
# Friendly greeting for your renamed service
if self._service_id_lc in {"monitor", "gradllm"}:
await self._emit_result(
llm,
f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
True,
"llmServiceMessage",
check_system=True,
)
# Notify "started" (full LLMServiceObj)
await self._pub.publish("llmServiceStarted", llm)
async def RemoveSession(self, payload: Any) -> None:
llm = self._to_model(payload)
base = (llm.SessionId or "").split("_")[0]
if not base:
await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage")
return
targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
msgs: list[str] = []
ok = True
for sid in targets:
s = self._sessions.get(sid)
if not s or not s.Runner:
continue
try:
await s.Runner.RemoveProcess(sid)
s.Runner = None
msgs.append(sid)
except Exception as e:
ok = False
msgs.append(f"Error {sid}: {e}")
if ok:
await self._emit_result(
llm,
f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}",
True,
"llmSessionMessage",
check_system=True,
)
else:
await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
async def StopRequest(self, payload: Any) -> None:
llm = self._to_model(payload)
sid = llm.SessionId or ""
s = self._session_for(sid)
if not s or not s.Runner:
await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
return
await s.Runner.StopRequest(sid)
await self._emit_result(
llm,
f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted",
True,
"llmServiceMessage",
check_system=True,
)
async def UserInput(self, payload: Any) -> None:
llm = self._to_model(payload)
sid = llm.SessionId or ""
s = self._session_for(sid)
if not s or not s.Runner:
await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
return
r: ILLMRunner = s.Runner
if getattr(r, "IsStateStarting", False):
await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
return
if getattr(r, "IsStateFailed", False):
await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
return
# Let runner push partials itself if desired; we still return a small ack
await r.SendInputAndGetResponse(llm.model_dump(by_alias=True))
async def QueryIndexResult(self, payload: Any) -> None:
"""
Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator.
Here, we forward a service message containing the same info so the UI can reflect completion.
'payload' usually has: Success, Message, QueryResults: [{Output: "..."}]
"""
try:
data = payload if isinstance(payload, dict) else {}
outputs = data.get("QueryResults") or []
rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
await self._pub.publish(
"llmServiceMessage",
ResultObj(
Message=data.get("Message", ""),
Success=bool(data.get("Success", False)),
Data=rag_data,
),
)
except Exception as e:
await self._pub.publish(
"llmServiceMessage",
ResultObj(Message=str(e), Success=False),
)
async def GetFunctionRegistry(self, filtered: bool = False) -> None:
"""
Wire up to your real registry when ready.
For now, mimic your success message payload.
"""
catalog = "{}" # replace with real JSON
msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
await self._pub.publish(
"llmServiceMessage",
ResultObj(Message=msg, Success=True),
)