johnbridges commited on
Commit
2c8368f
·
1 Parent(s): 8690dbe
Files changed (4) hide show
  1. listener.py +20 -20
  2. models.py +149 -0
  3. rabbit_repo.py +30 -18
  4. runners/service.py +200 -80
listener.py CHANGED
@@ -1,47 +1,47 @@
1
  import json
2
- from typing import Callable, Dict, List, Optional
3
  import aio_pika
4
- from rabbit_base import RabbitBase
5
- from config import settings
6
 
7
- # Maps FuncName -> handler coroutine
8
- Handler = Callable[[dict], "awaitable[None]"]
9
 
10
- class RabbitListenerBase(RabbitBase):
11
- def __init__(self, service_id: str, handlers: Dict[str, Handler]):
12
- super().__init__()
13
- self._service_id = service_id
 
 
14
  self._handlers = handlers
15
  self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
16
 
17
  def _qname(self, exchange: str, routing_keys: List[str]) -> str:
18
- rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk != ""])) or ""
19
  suffix = f"-{rk_part}" if rk_part else ""
20
- return f"{settings.RABBIT_INSTANCE_NAME}-{exchange}{suffix}"
21
 
22
  async def start(self, declarations: List[dict]):
23
- """
24
- declarations: list of {ExchangeName, FuncName, MessageTimeout, Type?, RoutingKeys?}
25
- """
26
  for d in declarations:
27
  exch = d["ExchangeName"]
28
- rks = d.get("RoutingKeys") or [settings.RABBIT_ROUTING_KEY]
29
  ttl = d.get("MessageTimeout") or None
30
- q = await self.declare_queue_bind(exchange=exch, queue_name=self._qname(exch, rks), routing_keys=rks, ttl_ms=ttl)
 
 
31
  await q.consume(self._make_consumer(d["FuncName"]))
32
  self._consumers.append(q)
33
 
34
  def _make_consumer(self, func_name: str):
35
  handler = self._handlers.get(func_name)
 
36
  async def _on_msg(msg: aio_pika.IncomingMessage):
37
  async with msg.process():
38
  try:
39
- # Expect CloudEvent JSON
40
  envelope = json.loads(msg.body.decode("utf-8"))
41
- data = envelope.get("data")
 
 
42
  if handler:
43
  await handler(data)
44
- except Exception as e:
45
- # swallow to avoid nack loops; your logger can capture details
46
  pass
 
47
  return _on_msg
 
1
  import json
 
2
  import aio_pika
3
+ from typing import Callable, Awaitable, Dict, Any, List, Optional
 
4
 
5
+ from models import CloudEvent
 
6
 
7
+ Handler = Callable[[Any], Awaitable[None]] # data can be dict / list / str
8
+
9
+ class RabbitListenerBase:
10
+ def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
11
+ self._base = base
12
+ self._instance_name = instance_name
13
  self._handlers = handlers
14
  self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
15
 
16
  def _qname(self, exchange: str, routing_keys: List[str]) -> str:
17
+ rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
18
  suffix = f"-{rk_part}" if rk_part else ""
19
+ return f"{self._instance_name}-{exchange}{suffix}"
20
 
21
  async def start(self, declarations: List[dict]):
 
 
 
22
  for d in declarations:
23
  exch = d["ExchangeName"]
 
24
  ttl = d.get("MessageTimeout") or None
25
+ rks = d.get("RoutingKeys") or [""]
26
+ qname = self._qname(exch, rks)
27
+ q = await self._base.declare_queue_bind(exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl)
28
  await q.consume(self._make_consumer(d["FuncName"]))
29
  self._consumers.append(q)
30
 
31
  def _make_consumer(self, func_name: str):
32
  handler = self._handlers.get(func_name)
33
+
34
  async def _on_msg(msg: aio_pika.IncomingMessage):
35
  async with msg.process():
36
  try:
 
37
  envelope = json.loads(msg.body.decode("utf-8"))
38
+ # Validate basic CloudEvent shape without being strict
39
+ # (C# side doesn’t require strict validation either)
40
+ data = envelope.get("data", None)
41
  if handler:
42
  await handler(data)
43
+ except Exception:
44
+ # swallow to avoid redelivery storms; log if you wire a logger
45
  pass
46
+
47
  return _on_msg
models.py ADDED
@@ -0,0 +1,149 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import Any, Optional, List
2
+ from datetime import datetime, timezone
3
+ from pydantic import BaseModel, Field
4
+
5
+
6
+ # ---------- CloudEvent ----------
7
+ class CloudEvent(BaseModel):
8
+ specversion: str = "1.0"
9
+ id: str
10
+ type: str
11
+ source: str
12
+ time: datetime
13
+ datacontenttype: str = "application/json"
14
+ data: Optional[Any] = None
15
+
16
+ @staticmethod
17
+ def now_iso() -> datetime:
18
+ return datetime.now(timezone.utc)
19
+
20
+ @classmethod
21
+ def wrap(cls, *, event_id: str, event_type: str, source: str, data: Any) -> "CloudEvent":
22
+ return cls(
23
+ id=event_id,
24
+ type=event_type or ("NullOrEmpty" if data is None else type(data).__name__),
25
+ source=source,
26
+ time=cls.now_iso(),
27
+ data=data,
28
+ )
29
+
30
+
31
+ # ---------- Permissive ancillary types you referenced ----------
32
+ class FunctionState(BaseModel):
33
+ IsFunctionCall: bool = False
34
+ IsFunctionCallResponse: bool = False
35
+ IsFunctionCallError: bool = False
36
+ IsFunctionCallStatus: bool = False
37
+ IsFunctionStillRunning: bool = False
38
+
39
+ # Convenience to set the 5-tuple like your C# SetFunctionState
40
+ def set_tuple(self, call: bool, resp: bool, err: bool, status: bool, running: bool):
41
+ self.IsFunctionCall = call
42
+ self.IsFunctionCallResponse = resp
43
+ self.IsFunctionCallError = err
44
+ self.IsFunctionCallStatus = status
45
+ self.IsFunctionStillRunning = running
46
+
47
+
48
+ class FunctionCallData(BaseModel):
49
+ # Add fields as you need; permissive placeholder
50
+ __root__: dict = Field(default_factory=dict)
51
+
52
+
53
+ class UserInfo(BaseModel):
54
+ # Add fields as you need; permissive placeholder
55
+ __root__: dict = Field(default_factory=dict)
56
+
57
+
58
+ # ---------- LLMServiceObj (field names match C# exactly) ----------
59
+ class LLMServiceObj(BaseModel):
60
+ # strings
61
+ SessionId: str = ""
62
+ JsonFunction: str = ""
63
+ LlmMessage: str = ""
64
+ ResultMessage: str = ""
65
+ UserInput: str = ""
66
+ RequestSessionId: str = ""
67
+ FunctionName: str = ""
68
+ TimeZone: str = ""
69
+ LLMRunnerType: str = "TurboLLM"
70
+ SourceLlm: str = ""
71
+ DestinationLlm: str = ""
72
+ MessageID: str = ""
73
+ LlmSessionStartName: str = ""
74
+ SwapFunctionName: str = ""
75
+ ChatAgentLocation: str = ""
76
+ ToolsDefinitionId: Optional[str] = None
77
+ JsonToolsBuilderSpec: Optional[str] = None
78
+
79
+ # ints / bools
80
+ TokensUsed: int = 0
81
+ IsUserLoggedIn: bool = False
82
+ IsFuncAck: bool = False
83
+ IsProcessed: bool = False
84
+ IsSystemLlm: bool = False
85
+ Timeout: Optional[int] = None
86
+
87
+ # complex
88
+ FunctionCallId: str = ""
89
+ FunctionCallData: FunctionCallData = Field(default_factory=FunctionCallData)
90
+ UserInfo: UserInfo = Field(default_factory=UserInfo)
91
+ StartTimeUTC: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
92
+
93
+ # stacks (serialize as arrays; C# Stack<T> will read fine)
94
+ LlmStack: List[str] = Field(default_factory=list)
95
+ FunctionCallIdStack: List[str] = Field(default_factory=list)
96
+ FunctionNameStack: List[str] = Field(default_factory=list)
97
+ IsProcessedStack: List[bool] = Field(default_factory=list)
98
+ MessageIDStack: List[str] = Field(default_factory=list)
99
+
100
+ # function state (maps to your FunctionState object)
101
+ # Your C# stores this in a private field + setters; we expose the same booleans as a nested object.
102
+ IsFunctionCall: bool = False
103
+ IsFunctionCallResponse: bool = False
104
+ IsFunctionCallError: bool = False
105
+ IsFunctionCallStatus: bool = False
106
+ IsFunctionStillRunning: bool = False
107
+
108
+ # Convenience helpers (optional)
109
+ def push_llm(self, llm_name: str, new_call_id: str, new_func_name: str, new_message_id: str, new_is_processed: bool):
110
+ if self.SourceLlm:
111
+ self.LlmStack.append(self.SourceLlm)
112
+ self.SourceLlm = self.DestinationLlm
113
+ self.DestinationLlm = llm_name
114
+
115
+ if self.MessageID:
116
+ self.MessageIDStack.append(self.MessageID)
117
+ self.MessageID = new_message_id
118
+
119
+ if self.FunctionCallId:
120
+ self.FunctionCallIdStack.append(self.FunctionCallId)
121
+ self.FunctionCallId = new_call_id
122
+
123
+ if self.FunctionName:
124
+ self.FunctionNameStack.append(self.FunctionName)
125
+ self.FunctionName = new_func_name
126
+
127
+ self.IsProcessedStack.append(self.IsProcessed)
128
+ self.IsProcessed = new_is_processed
129
+
130
+ def pop_llm(self):
131
+ if self.LlmStack:
132
+ self.SourceLlm = self.LlmStack.pop()
133
+ self.DestinationLlm = self.SourceLlm
134
+
135
+ if self.MessageIDStack:
136
+ self.MessageID = self.MessageIDStack.pop()
137
+ if self.FunctionCallIdStack:
138
+ self.FunctionCallId = self.FunctionCallIdStack.pop()
139
+ if self.FunctionNameStack:
140
+ self.FunctionName = self.FunctionNameStack.pop()
141
+ if self.IsProcessedStack:
142
+ self.IsProcessed = self.IsProcessedStack.pop()
143
+
144
+
145
+ # ---------- ResultObj ----------
146
+ class ResultObj(BaseModel):
147
+ Message: str = ""
148
+ Success: bool = False
149
+ Data: Optional[Any] = None
rabbit_repo.py CHANGED
@@ -1,12 +1,13 @@
1
  import uuid
2
- from typing import Any
3
- from rabbit_base import RabbitBase
4
- from cloud_event import CloudEvent
5
- from config import settings
6
  from utils import to_json, json_compress_str
7
 
8
- class RabbitRepo(RabbitBase):
9
- def __init__(self, external_source: str):
 
10
  super().__init__(exchange_type_resolver=self._resolve_type)
11
  self._external_source = external_source # like SystemUrl.ExternalUrl
12
 
@@ -18,17 +19,28 @@ class RabbitRepo(RabbitBase):
18
  return settings.RABBIT_EXCHANGE_TYPE
19
 
20
  async def publish(self, exchange: str, obj: Any, routing_key: str = ""):
21
- ex = await self.ensure_exchange(exchange)
22
- payload = CloudEvent.wrap(obj, event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
23
- source=self._external_source, id=str(uuid.uuid4()))
24
- await ex.publish(aio_pika.Message(body=payload), routing_key=routing_key)
 
 
 
 
 
 
 
 
 
 
 
25
 
26
- async def publish_jsonz(self, exchange: str, obj: Any, routing_key: str = "", with_id: str | None = None):
27
- ex = await self.ensure_exchange(exchange)
28
- json_str = to_json(obj)
29
- datajsonZ = json_compress_str(json_str)
30
- to_send = (datajsonZ, with_id) if with_id else datajsonZ
31
- payload = CloudEvent.wrap(to_send, event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
32
- source=self._external_source, id=str(uuid.uuid4()))
33
- await ex.publish(aio_pika.Message(body=payload), routing_key=routing_key)
34
  return datajsonZ
 
1
  import uuid
2
+ from typing import Any, Optional
3
+ import aio_pika
4
+
5
+ from models import CloudEvent
6
  from utils import to_json, json_compress_str
7
 
8
+
9
+ class RabbitRepo:
10
+ def __init__(self, external_source: str):
11
  super().__init__(exchange_type_resolver=self._resolve_type)
12
  self._external_source = external_source # like SystemUrl.ExternalUrl
13
 
 
19
  return settings.RABBIT_EXCHANGE_TYPE
20
 
21
  async def publish(self, exchange: str, obj: Any, routing_key: str = ""):
22
+ ex = await self._base.ensure_exchange(exchange)
23
+ evt = CloudEvent.wrap(
24
+ event_id=str(uuid.uuid4()),
25
+ event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
26
+ source=self._source,
27
+ data=obj,
28
+ )
29
+ body = evt.model_dump_json(by_alias=False, exclude_none=True).encode("utf-8")
30
+ await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
31
+
32
+ async def publish_jsonz(self, exchange: str, obj: Any, routing_key: str = "", with_id: Optional[str] = None) -> str:
33
+ ex = await self._base.ensure_exchange(exchange)
34
+ datajson = to_json(obj)
35
+ datajsonZ = json_compress_str(datajson)
36
+ payload: Any = (datajsonZ, with_id) if with_id else datajsonZ
37
 
38
+ evt = CloudEvent.wrap(
39
+ event_id=str(uuid.uuid4()),
40
+ event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
41
+ source=self._source,
42
+ data=payload,
43
+ )
44
+ body = evt.model_dump_json(by_alias=False, exclude_none=True).encode("utf-8")
45
+ await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
46
  return datajsonZ
runners/service.py CHANGED
@@ -1,127 +1,247 @@
 
1
  import asyncio
2
- from typing import Dict, Optional
3
- from collections import defaultdict
4
 
5
- from rabbit_repo import RabbitRepo
6
  from config import settings
 
 
7
  from runners.base import ILLMRunner
8
 
 
 
 
 
 
 
 
9
  class LLMService:
 
 
 
 
10
  def __init__(self, publisher: RabbitRepo, runner_factory):
11
  self._pub = publisher
12
- self._runner_factory = runner_factory
13
- self._sessions: Dict[str, dict] = {} # sessionId -> {"Runner": ILLMRunner, "FullSessionId": str}
14
  self._ready = asyncio.Event()
15
- self._ready.set() # if you have async load, clear and set after
 
16
 
17
  async def init(self):
18
- # If you have history to load, do here then self._ready.set()
 
 
 
 
 
 
19
  pass
20
 
21
- async def _set_result(self, obj: dict, message: str, success: bool, queue: str, check_system: bool=False):
22
- obj["ResultMessage"] = message
23
- obj["ResultSuccess"] = success
24
- obj["LlmMessage"] = (f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>")
25
- # mirror your .NET rule (don’t publish for system llm if check_system is True)
26
- if not (check_system and obj.get("IsSystemLlm")):
27
- await self._pub.publish(queue, obj)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
28
 
29
- async def StartProcess(self, llmServiceObj: dict):
30
- session_id = f"{llmServiceObj['RequestSessionId']}_{llmServiceObj['LLMRunnerType']}"
31
- llmServiceObj["SessionId"] = session_id
32
 
33
- # wait ready (max ~120s like .NET)
34
  try:
35
  await asyncio.wait_for(self._ready.wait(), timeout=120)
36
  except asyncio.TimeoutError:
37
- await self._set_result(llmServiceObj, "Timed out waiting for initialization.", False, "llmServiceMessage", True)
 
 
38
  return
39
 
40
- sess = self._sessions.get(session_id)
41
- is_runner_null = not sess or not sess.get("Runner")
 
42
 
43
- create_new = is_runner_null or sess["Runner"].IsStateFailed
44
  if create_new:
45
- if sess and sess.get("Runner"):
 
46
  try:
47
- await sess["Runner"].RemoveProcess(session_id)
48
- except: pass
 
49
 
50
- runner: ILLMRunner = await self._runner_factory(llmServiceObj)
 
51
  if not runner.IsEnabled:
52
- await self._set_result(llmServiceObj, f"{llmServiceObj['LLMRunnerType']} {settings.SERVICE_ID} not started as it is disabled.", True, "llmServiceMessage")
 
 
 
 
 
53
  return
54
 
55
- await self._set_result(llmServiceObj, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", True)
56
- await runner.StartProcess(llmServiceObj)
 
57
 
58
- self._sessions[session_id] = {"Runner": runner, "FullSessionId": session_id}
59
- if settings.SERVICE_ID == "monitor":
60
- await self._set_result(llmServiceObj, f"Hi i'm {runner.Type} your Network Monitor Assistant. How can I help you.", True, "llmServiceMessage", True)
61
 
62
- await self._pub.publish("llmServiceStarted", llmServiceObj)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63
 
64
- async def RemoveSession(self, llmServiceObj: dict):
65
- # Behaves like your RemoveAllSessionIdProcesses (prefix match)
66
- base = llmServiceObj.get("SessionId","").split("_")[0]
67
- targets = [k for k in self._sessions.keys() if k.startswith(base + "_")]
68
  msgs = []
69
  ok = True
70
  for sid in targets:
71
  s = self._sessions.get(sid)
72
- if s and s.get("Runner"):
73
- try:
74
- await s["Runner"].RemoveProcess(sid)
75
- s["Runner"] = None
76
- msgs.append(sid)
77
- except Exception as e:
78
- ok = False
79
- msgs.append(f"Error {sid}: {e}")
 
 
80
  if ok:
81
- await self._set_result(llmServiceObj, f"Success: Removed sessions for {' '.join(msgs)}", True, "llmSessionMessage", True)
 
 
 
 
 
 
82
  else:
83
- await self._set_result(llmServiceObj, " ".join(msgs), False, "llmServiceMessage")
84
-
85
- async def StopRequest(self, llmServiceObj: dict):
86
- sid = llmServiceObj.get("SessionId","")
87
- s = self._sessions.get(sid)
88
- if not s or not s.get("Runner"):
89
- await self._set_result(llmServiceObj, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
 
90
  return
91
- await s["Runner"].StopRequest(sid)
92
- await self._set_result(llmServiceObj, f"Success {s['Runner'].Type} {settings.SERVICE_ID} Assistant output has been halted", True, "llmServiceMessage", True)
93
-
94
- async def UserInput(self, llmServiceObj: dict):
95
- sid = llmServiceObj.get("SessionId","")
96
- s = self._sessions.get(sid)
97
- if not s or not s.get("Runner"):
98
- await self._set_result(llmServiceObj, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
 
 
 
 
 
 
 
 
99
  return
100
- r: ILLMRunner = s["Runner"]
 
101
  if r.IsStateStarting:
102
- await self._set_result(llmServiceObj, "Please wait, the assistant is starting...", False, "llmServiceMessage")
103
  return
104
  if r.IsStateFailed:
105
- await self._set_result(llmServiceObj, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
106
  return
107
- await r.SendInputAndGetResponse(llmServiceObj)
108
- # emitter side can push partials directly to queues if desired
109
 
110
- async def QueryIndexResult(self, queryIndexRequest: dict):
111
- # Adapted to your behavior: concatenate outputs, publish completion via internal coordinator if needed
 
 
 
 
 
 
 
112
  try:
113
- rag_data = "\n".join([qr.get("Output","") for qr in (queryIndexRequest.get("QueryResults") or [])])
114
- # You signal _queryCoordinator.CompleteQuery in .NET; here you may forward/publish result…
115
- # Example: include rag data in a service message to the session
116
- await self._pub.publish("llmServiceMessage", {
117
- "ResultSuccess": queryIndexRequest.get("Success", False),
118
- "ResultMessage": queryIndexRequest.get("Message",""),
119
- "Data": rag_data,
120
- })
 
 
 
 
 
121
  except Exception as e:
122
- await self._pub.publish("llmServiceMessage", {"ResultSuccess": False, "ResultMessage": str(e)})
 
 
 
123
 
124
  async def GetFunctionRegistry(self, filtered: bool = False):
125
- # Plug in your registry
126
- data = {"FunctionCatalogJson": "{}", "Filtered": filtered}
127
- await self._pub.publish("llmServiceMessage", {"ResultSuccess": True, "ResultMessage": f"Success : Got GetFunctionCatalogJson : {data}"})
 
 
 
 
 
 
 
 
1
+ # service.py
2
  import asyncio
3
+ from dataclasses import dataclass
4
+ from typing import Any, Dict, Optional
5
 
 
6
  from config import settings
7
+ from models import LLMServiceObj, ResultObj
8
+ from rabbit_repo import RabbitRepo
9
  from runners.base import ILLMRunner
10
 
11
+
12
+ @dataclass
13
+ class _Session:
14
+ Runner: Optional[ILLMRunner]
15
+ FullSessionId: str
16
+
17
+
18
  class LLMService:
19
+ """
20
+ Python/Gradio equivalent of your .NET LLMService.
21
+ Keeps identical field names and queue semantics when talking to RabbitMQ.
22
+ """
23
  def __init__(self, publisher: RabbitRepo, runner_factory):
24
  self._pub = publisher
25
+ self._runner_factory = runner_factory # async factory: dict|LLMServiceObj -> ILLMRunner
26
+ self._sessions: Dict[str, _Session] = {}
27
  self._ready = asyncio.Event()
28
+ # If you need async load (history, etc.), call self._ready.clear() and later set
29
+ self._ready.set()
30
 
31
  async def init(self):
32
+ """
33
+ Hook to preload history/sessions if needed; call self._ready.set() when finished.
34
+ """
35
+ # Example:
36
+ # self._ready.clear()
37
+ # await load_history()
38
+ # self._ready.set()
39
  pass
40
 
41
+ # ---------------------------- helpers ----------------------------
42
+
43
+ def _to_model(self, data: Any) -> LLMServiceObj:
44
+ """
45
+ Accepts LLMServiceObj or dict and returns a validated LLMServiceObj.
46
+ """
47
+ if isinstance(data, LLMServiceObj):
48
+ return data
49
+ if isinstance(data, dict):
50
+ return LLMServiceObj(**data)
51
+ # If your pipeline ever sends compressed strings here, decompress+parse first.
52
+ raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).")
53
+
54
+ async def _emit_result(
55
+ self,
56
+ obj: LLMServiceObj,
57
+ message: str,
58
+ success: bool,
59
+ queue: str,
60
+ *,
61
+ check_system: bool = False,
62
+ include_llm_message: bool = True
63
+ ):
64
+ """
65
+ Build a ResultObj-style message on the wire, mirroring your .NET usage.
66
+ check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
67
+ """
68
+ obj.ResultMessage = message
69
+ obj.ResultSuccess = success
70
+ if include_llm_message:
71
+ obj.LlmMessage = f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>"
72
+
73
+ if check_system and obj.IsSystemLlm:
74
+ return
75
+
76
+ # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
77
+ # That lets the coordinator show the assistant text and statuses.
78
+ await self._pub.publish(queue, obj)
79
+
80
+ def _session_for(self, session_id: str) -> Optional[_Session]:
81
+ return self._sessions.get(session_id)
82
+
83
+ # ---------------------------- API methods ----------------------------
84
+
85
+ async def StartProcess(self, payload: Any):
86
+ llm = self._to_model(payload)
87
 
88
+ # Construct Python-side session id like C#: RequestSessionId + "_" + LLMRunnerType
89
+ session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
90
+ llm.SessionId = session_id
91
 
92
+ # Wait ready (max 120s) exactly like the C# logic
93
  try:
94
  await asyncio.wait_for(self._ready.wait(), timeout=120)
95
  except asyncio.TimeoutError:
96
+ await self._emit_result(
97
+ llm, "Timed out waiting for initialization.", False, "llmServiceMessage", check_system=True
98
+ )
99
  return
100
 
101
+ sess = self._session_for(session_id)
102
+ is_runner_null = (sess is None) or (sess.Runner is None)
103
+ create_new = is_runner_null or (sess and sess.Runner and sess.Runner.IsStateFailed)
104
 
 
105
  if create_new:
106
+ # Remove previous runner if exists
107
+ if sess and sess.Runner:
108
  try:
109
+ await sess.Runner.RemoveProcess(session_id)
110
+ except Exception:
111
+ pass
112
 
113
+ # Create runner from factory
114
+ runner: ILLMRunner = await self._runner_factory(llm.model_dump())
115
  if not runner.IsEnabled:
116
+ await self._emit_result(
117
+ llm,
118
+ f"{llm.LLMRunnerType} {settings.SERVICE_ID} not started as it is disabled.",
119
+ True,
120
+ "llmServiceMessage",
121
+ )
122
  return
123
 
124
+ await self._emit_result(
125
+ llm, f"Starting {runner.Type} {settings.SERVICE_ID} Expert", True, "llmServiceMessage", check_system=True
126
+ )
127
 
128
+ await runner.StartProcess(llm.model_dump())
 
 
129
 
130
+ self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
131
+
132
+ # Mirror your friendly greeting, gated by service id (you renamed yours to gradllm)
133
+ if settings.SERVICE_ID.lower() in {"monitor", "gradllm"}:
134
+ await self._emit_result(
135
+ llm,
136
+ f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
137
+ True,
138
+ "llmServiceMessage",
139
+ check_system=True,
140
+ )
141
+
142
+ # Notify "started"
143
+ await self._pub.publish("llmServiceStarted", llm)
144
+
145
+ async def RemoveSession(self, payload: Any):
146
+ llm = self._to_model(payload)
147
+ base = (llm.SessionId or "").split("_")[0]
148
+ targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
149
 
 
 
 
 
150
  msgs = []
151
  ok = True
152
  for sid in targets:
153
  s = self._sessions.get(sid)
154
+ if not s or not s.Runner:
155
+ continue
156
+ try:
157
+ await s.Runner.RemoveProcess(sid)
158
+ s.Runner = None
159
+ msgs.append(sid)
160
+ except Exception as e:
161
+ ok = False
162
+ msgs.append(f"Error {sid}: {e}")
163
+
164
  if ok:
165
+ await self._emit_result(
166
+ llm,
167
+ f"Success: Removed sessions for {' '.join(msgs) if msgs else '(none)'}",
168
+ True,
169
+ "llmSessionMessage",
170
+ check_system=True,
171
+ )
172
  else:
173
+ await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
174
+
175
+ async def StopRequest(self, payload: Any):
176
+ llm = self._to_model(payload)
177
+ sid = llm.SessionId or ""
178
+ s = self._session_for(sid)
179
+ if not s or not s.Runner:
180
+ await self._emit_result(llm, f"Error: Runner missing for session {sid}.", False, "llmServiceMessage")
181
  return
182
+
183
+ await s.Runner.StopRequest(sid)
184
+ await self._emit_result(
185
+ llm,
186
+ f"Success {s.Runner.Type} {settings.SERVICE_ID} Assistant output has been halted",
187
+ True,
188
+ "llmServiceMessage",
189
+ check_system=True,
190
+ )
191
+
192
+ async def UserInput(self, payload: Any):
193
+ llm = self._to_model(payload)
194
+ sid = llm.SessionId or ""
195
+ s = self._session_for(sid)
196
+ if not s or not s.Runner:
197
+ await self._emit_result(llm, f"Error: SessionId {sid} has no running process.", False, "llmServiceMessage")
198
  return
199
+
200
+ r: ILLMRunner = s.Runner
201
  if r.IsStateStarting:
202
+ await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
203
  return
204
  if r.IsStateFailed:
205
+ await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
206
  return
 
 
207
 
208
+ # Let runner push partials itself if desired; we still return a small ack
209
+ await r.SendInputAndGetResponse(llm.model_dump())
210
+
211
+ async def QueryIndexResult(self, payload: Any):
212
+ """
213
+ Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator.
214
+ Here, we forward a service message containing the same info so the UI can reflect completion.
215
+ 'payload' usually has: Success, Message, QueryResults: [{Output: "..."}]
216
+ """
217
  try:
218
+ data = payload if isinstance(payload, dict) else {}
219
+ outputs = data.get("QueryResults") or []
220
+ rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
221
+
222
+ # Shape compatible with your coordinator expectations
223
+ await self._pub.publish(
224
+ "llmServiceMessage",
225
+ ResultObj(
226
+ Message=data.get("Message", ""),
227
+ Success=bool(data.get("Success", False)),
228
+ Data=rag_data,
229
+ )
230
+ )
231
  except Exception as e:
232
+ await self._pub.publish(
233
+ "llmServiceMessage",
234
+ ResultObj(Message=str(e), Success=False)
235
+ )
236
 
237
  async def GetFunctionRegistry(self, filtered: bool = False):
238
+ """
239
+ Wire up to your real registry when ready.
240
+ For now, mimic your success message payload.
241
+ """
242
+ catalog = "{}" # replace with real JSON
243
+ msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
244
+ await self._pub.publish(
245
+ "llmServiceMessage",
246
+ ResultObj(Message=msg, Success=True)
247
+ )