|
|
|
|
|
import asyncio |
|
|
from dataclasses import dataclass |
|
|
from typing import Any, Dict, Optional |
|
|
|
|
|
from config import settings |
|
|
from models import LLMServiceObj, ResultObj |
|
|
from rabbit_repo import RabbitRepo |
|
|
from runners.base import ILLMRunner |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class _Session: |
|
|
Runner: Optional[ILLMRunner] |
|
|
FullSessionId: str |
|
|
|
|
|
|
|
|
class LLMService: |
|
|
""" |
|
|
Python/Gradio equivalent of your .NET LLMService. |
|
|
Keeps identical field names and queue semantics when talking to RabbitMQ. |
|
|
""" |
|
|
def __init__(self, publisher: RabbitRepo, runner_factory): |
|
|
self._pub = publisher |
|
|
self._runner_factory = runner_factory |
|
|
self._sessions: Dict[str, _Session] = {} |
|
|
self._ready = asyncio.Event() |
|
|
|
|
|
self._ready.set() |
|
|
|
|
|
async def init(self): |
|
|
""" |
|
|
Hook to preload history/sessions if needed; call self._ready.set() when finished. |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
def _to_model(self, data: Any) -> LLMServiceObj: |
|
|
""" |
|
|
Accepts LLMServiceObj or dict and returns a validated LLMServiceObj. |
|
|
""" |
|
|
if isinstance(data, LLMServiceObj): |
|
|
return data |
|
|
if isinstance(data, dict): |
|
|
return LLMServiceObj(**data) |
|
|
|
|
|
raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).") |
|
|
|
|
|
async def _emit_result( |
|
|
self, |
|
|
obj: LLMServiceObj, |
|
|
message: str, |
|
|
success: bool, |
|
|
queue: str, |
|
|
*, |
|
|
check_system: bool = False, |
|
|
include_llm_message: bool = True |
|
|
): |
|
|
""" |
|
|
Build a ResultObj-style message on the wire, mirroring your .NET usage. |
|
|
check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule). |
|
|
""" |
|
|
obj.ResultMessage = message |
|
|
obj.ResultSuccess = success |
|
|
if include_llm_message: |
|
|
obj.LlmMessage = f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>" |
|
|
|
|
|
if check_system and obj.IsSystemLlm: |
|
|
return |
|
|
|
|
|
|
|
|
|
|
|
await self._pub.publish(queue, obj) |
|
|
|
|
|
def _session_for(self, session_id: str) -> Optional[_Session]: |
|
|
return self._sessions.get(session_id) |
|
|
|
|
|
|
|
|
|
|
|
async def StartProcess(self, payload: Any): |
|
|
llm = self._to_model(payload) |
|
|
|
|
|
|
|
|
session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}" |
|
|
llm.SessionId = session_id |
|
|
|
|
|
|
|
|
try: |
|
|
await asyncio.wait_for(self._ready.wait(), timeout=120) |
|
|
except asyncio.TimeoutError: |
|
|
await self._emit_result( |
|
|
llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True |
|
|
) |
|
|
return |
|
|
|
|
|
sess = self._session_for(session_id) |
|
|
is_runner_null = (sess is None) or (sess.Runner is None) |
|
|
create_new = is_runner_null or (sess and sess.Runner and sess.Runner.IsStateFailed) |
|
|
|
|
|
if create_new: |
|
|
|
|
|
if sess and sess.Runner: |
|
|
try: |
|
|
await sess.Runner.RemoveProcess(session_id) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
|
|
|
runner: ILLMRunner = await self._runner_factory(llm.model_dump()) |
|
|
if not runner.IsEnabled: |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.", |
|
|
True, |
|
|
"llmServiceMessage", |
|
|
) |
|
|
return |
|
|
|
|
|
await self._emit_result( |
|
|
llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True |
|
|
) |
|
|
|
|
|
await runner.StartProcess(llm.model_dump()) |
|
|
|
|
|
self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id) |
|
|
|
|
|
|
|
|
if settings.SERVICE_ID.lower() in {"monitor", "gradllm"}: |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.", |
|
|
True, |
|
|
"llmServiceMessage", |
|
|
check_system=True, |
|
|
) |
|
|
|
|
|
|
|
|
await self._pub.publish("llmServiceStarted", llm) |
|
|
|
|
|
async def RemoveSession(self, payload: Any): |
|
|
llm = self._to_model(payload) |
|
|
base = (llm.SessionId or "").split("_")[0] |
|
|
targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")] |
|
|
|
|
|
msgs = [] |
|
|
ok = True |
|
|
for sid in targets: |
|
|
s = self._sessions.get(sid) |
|
|
if not s or not s.Runner: |
|
|
continue |
|
|
try: |
|
|
await s.Runner.RemoveProcess(sid) |
|
|
s.Runner = None |
|
|
msgs.append(sid) |
|
|
except Exception as e: |
|
|
ok = False |
|
|
msgs.append(f"Error {sid}: {e}") |
|
|
|
|
|
if ok: |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}", |
|
|
True, |
|
|
"llmSessionMessage", |
|
|
check_system=True, |
|
|
) |
|
|
else: |
|
|
await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage") |
|
|
|
|
|
async def StopRequest(self, payload: Any): |
|
|
llm = self._to_model(payload) |
|
|
sid = llm.SessionId or "" |
|
|
s = self._session_for(sid) |
|
|
if not s or not s.Runner: |
|
|
await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
await s.Runner.StopRequest(sid) |
|
|
await self._emit_result( |
|
|
llm, |
|
|
f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted", |
|
|
True, |
|
|
"llmServiceMessage", |
|
|
check_system=True, |
|
|
) |
|
|
|
|
|
async def UserInput(self, payload: Any): |
|
|
llm = self._to_model(payload) |
|
|
sid = llm.SessionId or "" |
|
|
s = self._session_for(sid) |
|
|
if not s or not s.Runner: |
|
|
await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
r: ILLMRunner = s.Runner |
|
|
if r.IsStateStarting: |
|
|
await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage") |
|
|
return |
|
|
if r.IsStateFailed: |
|
|
await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage") |
|
|
return |
|
|
|
|
|
|
|
|
await r.SendInputAndGetResponse(llm.model_dump()) |
|
|
|
|
|
async def QueryIndexResult(self, payload: Any): |
|
|
""" |
|
|
Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator. |
|
|
Here, we forward a service message containing the same info so the UI can reflect completion. |
|
|
'payload' usually has: Success, Message, QueryResults: [{Output: "..."}] |
|
|
""" |
|
|
try: |
|
|
data = payload if isinstance(payload, dict) else {} |
|
|
outputs = data.get("QueryResults") or [] |
|
|
rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)]) |
|
|
|
|
|
|
|
|
await self._pub.publish( |
|
|
"llmServiceMessage", |
|
|
ResultObj( |
|
|
Message=data.get("Message", ""), |
|
|
Success=bool(data.get("Success", False)), |
|
|
Data=rag_data, |
|
|
) |
|
|
) |
|
|
except Exception as e: |
|
|
await self._pub.publish( |
|
|
"llmServiceMessage", |
|
|
ResultObj(Message=str(e), Success=False) |
|
|
) |
|
|
|
|
|
async def GetFunctionRegistry(self, filtered: bool = False): |
|
|
""" |
|
|
Wire up to your real registry when ready. |
|
|
For now, mimic your success message payload. |
|
|
""" |
|
|
catalog = "{}" |
|
|
msg = f"Success : Got GetFunctionCatalogJson : {catalog}" |
|
|
await self._pub.publish( |
|
|
"llmServiceMessage", |
|
|
ResultObj(Message=msg, Success=True) |
|
|
) |
|
|
|