GradLLM / runners /service.py
johnbridges's picture
.
2c8368f
raw
history blame
9.16 kB
# service.py
import asyncio
from dataclasses import dataclass
from typing import Any, Dict, Optional
from config import settings
from models import LLMServiceObj, ResultObj
from rabbit_repo import RabbitRepo
from runners.base import ILLMRunner
@dataclass
class _Session:
Runner: Optional[ILLMRunner]
FullSessionId: str
class LLMService:
"""
Python/Gradio equivalent of your .NET LLMService.
Keeps identical field names and queue semantics when talking to RabbitMQ.
"""
def __init__(self, publisher: RabbitRepo, runner_factory):
self._pub = publisher
self._runner_factory = runner_factory # async factory: dict|LLMServiceObj -> ILLMRunner
self._sessions: Dict[str, _Session] = {}
self._ready = asyncio.Event()
# If you need async load (history, etc.), call self._ready.clear() and later set
self._ready.set()
async def init(self):
"""
Hook to preload history/sessions if needed; call self._ready.set() when finished.
"""
# Example:
# self._ready.clear()
# await load_history()
# self._ready.set()
pass
# ---------------------------- helpers ----------------------------
def _to_model(self, data: Any) -> LLMServiceObj:
"""
Accepts LLMServiceObj or dict and returns a validated LLMServiceObj.
"""
if isinstance(data, LLMServiceObj):
return data
if isinstance(data, dict):
return LLMServiceObj(**data)
# If your pipeline ever sends compressed strings here, decompress+parse first.
raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).")
async def _emit_result(
self,
obj: LLMServiceObj,
message: str,
success: bool,
queue: str,
*,
check_system: bool = False,
include_llm_message: bool = True
):
"""
Build a ResultObj-style message on the wire, mirroring your .NET usage.
check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
"""
obj.ResultMessage = message
obj.ResultSuccess = success
if include_llm_message:
obj.LlmMessage = f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>"
if check_system and obj.IsSystemLlm:
return
# You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
# That lets the coordinator show the assistant text and statuses.
await self._pub.publish(queue, obj)
def _session_for(self, session_id: str) -> Optional[_Session]:
return self._sessions.get(session_id)
# ---------------------------- API methods ----------------------------
async def StartProcess(self, payload: Any):
llm = self._to_model(payload)
# Construct Python-side session id like C#: RequestSessionId + "_" + LLMRunnerType
session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
llm.SessionId = session_id
# Wait ready (max 120s) exactly like the C# logic
try:
await asyncio.wait_for(self._ready.wait(), timeout=120)
except asyncio.TimeoutError:
await self._emit_result(
llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True
)
return
sess = self._session_for(session_id)
is_runner_null = (sess is None) or (sess.Runner is None)
create_new = is_runner_null or (sess and sess.Runner and sess.Runner.IsStateFailed)
if create_new:
# Remove previous runner if exists
if sess and sess.Runner:
try:
await sess.Runner.RemoveProcess(session_id)
except Exception:
pass
# Create runner from factory
runner: ILLMRunner = await self._runner_factory(llm.model_dump())
if not runner.IsEnabled:
await self._emit_result(
llm,
f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.",
True,
"llmServiceMessage",
)
return
await self._emit_result(
llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True
)
await runner.StartProcess(llm.model_dump())
self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
# Mirror your friendly greeting, gated by service id (you renamed yours to gradllm)
if settings.SERVICE_ID.lower() in {"monitor", "gradllm"}:
await self._emit_result(
llm,
f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
True,
"llmServiceMessage",
check_system=True,
)
# Notify "started"
await self._pub.publish("llmServiceStarted", llm)
async def RemoveSession(self, payload: Any):
llm = self._to_model(payload)
base = (llm.SessionId or "").split("_")[0]
targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
msgs = []
ok = True
for sid in targets:
s = self._sessions.get(sid)
if not s or not s.Runner:
continue
try:
await s.Runner.RemoveProcess(sid)
s.Runner = None
msgs.append(sid)
except Exception as e:
ok = False
msgs.append(f"Error {sid}: {e}")
if ok:
await self._emit_result(
llm,
f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}",
True,
"llmSessionMessage",
check_system=True,
)
else:
await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
async def StopRequest(self, payload: Any):
llm = self._to_model(payload)
sid = llm.SessionId or ""
s = self._session_for(sid)
if not s or not s.Runner:
await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
return
await s.Runner.StopRequest(sid)
await self._emit_result(
llm,
f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted",
True,
"llmServiceMessage",
check_system=True,
)
async def UserInput(self, payload: Any):
llm = self._to_model(payload)
sid = llm.SessionId or ""
s = self._session_for(sid)
if not s or not s.Runner:
await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
return
r: ILLMRunner = s.Runner
if r.IsStateStarting:
await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
return
if r.IsStateFailed:
await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
return
# Let runner push partials itself if desired; we still return a small ack
await r.SendInputAndGetResponse(llm.model_dump())
async def QueryIndexResult(self, payload: Any):
"""
Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator.
Here, we forward a service message containing the same info so the UI can reflect completion.
'payload' usually has: Success, Message, QueryResults: [{Output: "..."}]
"""
try:
data = payload if isinstance(payload, dict) else {}
outputs = data.get("QueryResults") or []
rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
# Shape compatible with your coordinator expectations
await self._pub.publish(
"llmServiceMessage",
ResultObj(
Message=data.get("Message", ""),
Success=bool(data.get("Success", False)),
Data=rag_data,
)
)
except Exception as e:
await self._pub.publish(
"llmServiceMessage",
ResultObj(Message=str(e), Success=False)
)
async def GetFunctionRegistry(self, filtered: bool = False):
"""
Wire up to your real registry when ready.
For now, mimic your success message payload.
"""
catalog = "{}" # replace with real JSON
msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
await self._pub.publish(
"llmServiceMessage",
ResultObj(Message=msg, Success=True)
)