johnbridges commited on
Commit
9ad2a81
·
1 Parent(s): 2c8368f
Files changed (1) hide show
  1. runners/service.py +58 -51
runners/service.py CHANGED
@@ -1,7 +1,7 @@
1
  # service.py
2
  import asyncio
3
  from dataclasses import dataclass
4
- from typing import Any, Dict, Optional
5
 
6
  from config import settings
7
  from models import LLMServiceObj, ResultObj
@@ -20,72 +20,76 @@ class LLMService:
20
  Python/Gradio equivalent of your .NET LLMService.
21
  Keeps identical field names and queue semantics when talking to RabbitMQ.
22
  """
23
- def __init__(self, publisher: RabbitRepo, runner_factory):
24
- self._pub = publisher
25
- self._runner_factory = runner_factory # async factory: dict|LLMServiceObj -> ILLMRunner
 
 
 
 
26
  self._sessions: Dict[str, _Session] = {}
27
  self._ready = asyncio.Event()
28
- # If you need async load (history, etc.), call self._ready.clear() and later set
29
- self._ready.set()
30
 
31
- async def init(self):
32
- """
33
- Hook to preload history/sessions if needed; call self._ready.set() when finished.
34
- """
35
- # Example:
36
- # self._ready.clear()
37
- # await load_history()
38
- # self._ready.set()
39
  pass
40
 
41
  # ---------------------------- helpers ----------------------------
42
 
43
  def _to_model(self, data: Any) -> LLMServiceObj:
44
- """
45
- Accepts LLMServiceObj or dict and returns a validated LLMServiceObj.
46
- """
47
  if isinstance(data, LLMServiceObj):
48
  return data
49
  if isinstance(data, dict):
50
  return LLMServiceObj(**data)
51
- # If your pipeline ever sends compressed strings here, decompress+parse first.
52
  raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).")
53
 
54
  async def _emit_result(
55
  self,
56
- obj: LLMServiceObj,
57
  message: str,
58
  success: bool,
59
  queue: str,
60
  *,
61
  check_system: bool = False,
62
- include_llm_message: bool = True
63
- ):
64
  """
65
  Build a ResultObj-style message on the wire, mirroring your .NET usage.
66
  check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
67
  """
68
- obj.ResultMessage = message
69
- obj.ResultSuccess = success
 
 
70
  if include_llm_message:
71
- obj.LlmMessage = f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>"
72
 
73
- if check_system and obj.IsSystemLlm:
74
  return
75
 
76
  # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
77
- # That lets the coordinator show the assistant text and statuses.
78
- await self._pub.publish(queue, obj)
79
 
80
  def _session_for(self, session_id: str) -> Optional[_Session]:
81
  return self._sessions.get(session_id)
82
 
83
  # ---------------------------- API methods ----------------------------
84
 
85
- async def StartProcess(self, payload: Any):
86
  llm = self._to_model(payload)
87
 
88
- # Construct Python-side session id like C#: RequestSessionId + "_" + LLMRunnerType
 
 
 
 
 
 
 
 
89
  session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
90
  llm.SessionId = session_id
91
 
@@ -99,19 +103,19 @@ class LLMService:
99
  return
100
 
101
  sess = self._session_for(session_id)
102
- is_runner_null = (sess is None) or (sess.Runner is None)
103
- create_new = is_runner_null or (sess and sess.Runner and sess.Runner.IsStateFailed)
104
 
105
  if create_new:
106
  # Remove previous runner if exists
107
- if sess and sess.Runner:
108
  try:
109
- await sess.Runner.RemoveProcess(session_id)
110
  except Exception:
111
  pass
112
 
113
- # Create runner from factory
114
- runner: ILLMRunner = await self._runner_factory(llm.model_dump())
115
  if not runner.IsEnabled:
116
  await self._emit_result(
117
  llm,
@@ -129,8 +133,8 @@ class LLMService:
129
 
130
  self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
131
 
132
- # Mirror your friendly greeting, gated by service id (you renamed yours to gradllm)
133
- if settings.SERVICE_ID.lower() in {"monitor", "gradllm"}:
134
  await self._emit_result(
135
  llm,
136
  f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
@@ -139,16 +143,20 @@ class LLMService:
139
  check_system=True,
140
  )
141
 
142
- # Notify "started"
143
  await self._pub.publish("llmServiceStarted", llm)
144
 
145
- async def RemoveSession(self, payload: Any):
146
  llm = self._to_model(payload)
147
  base = (llm.SessionId or "").split("_")[0]
148
- targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
 
 
149
 
150
- msgs = []
 
151
  ok = True
 
152
  for sid in targets:
153
  s = self._sessions.get(sid)
154
  if not s or not s.Runner:
@@ -172,7 +180,7 @@ class LLMService:
172
  else:
173
  await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
174
 
175
- async def StopRequest(self, payload: Any):
176
  llm = self._to_model(payload)
177
  sid = llm.SessionId or ""
178
  s = self._session_for(sid)
@@ -189,7 +197,7 @@ class LLMService:
189
  check_system=True,
190
  )
191
 
192
- async def UserInput(self, payload: Any):
193
  llm = self._to_model(payload)
194
  sid = llm.SessionId or ""
195
  s = self._session_for(sid)
@@ -198,17 +206,17 @@ class LLMService:
198
  return
199
 
200
  r: ILLMRunner = s.Runner
201
- if r.IsStateStarting:
202
  await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
203
  return
204
- if r.IsStateFailed:
205
  await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
206
  return
207
 
208
  # Let runner push partials itself if desired; we still return a small ack
209
  await r.SendInputAndGetResponse(llm.model_dump())
210
 
211
- async def QueryIndexResult(self, payload: Any):
212
  """
213
  Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator.
214
  Here, we forward a service message containing the same info so the UI can reflect completion.
@@ -219,22 +227,21 @@ class LLMService:
219
  outputs = data.get("QueryResults") or []
220
  rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
221
 
222
- # Shape compatible with your coordinator expectations
223
  await self._pub.publish(
224
  "llmServiceMessage",
225
  ResultObj(
226
  Message=data.get("Message", ""),
227
  Success=bool(data.get("Success", False)),
228
  Data=rag_data,
229
- )
230
  )
231
  except Exception as e:
232
  await self._pub.publish(
233
  "llmServiceMessage",
234
- ResultObj(Message=str(e), Success=False)
235
  )
236
 
237
- async def GetFunctionRegistry(self, filtered: bool = False):
238
  """
239
  Wire up to your real registry when ready.
240
  For now, mimic your success message payload.
@@ -243,5 +250,5 @@ class LLMService:
243
  msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
244
  await self._pub.publish(
245
  "llmServiceMessage",
246
- ResultObj(Message=msg, Success=True)
247
  )
 
1
  # service.py
2
  import asyncio
3
  from dataclasses import dataclass
4
+ from typing import Any, Dict, Optional, Callable, Awaitable
5
 
6
  from config import settings
7
  from models import LLMServiceObj, ResultObj
 
20
  Python/Gradio equivalent of your .NET LLMService.
21
  Keeps identical field names and queue semantics when talking to RabbitMQ.
22
  """
23
+ def __init__(
24
+ self,
25
+ publisher: RabbitRepo,
26
+ runner_factory: Callable[[Dict[str, Any]], Awaitable[ILLMRunner]],
27
+ ):
28
+ self._pub: RabbitRepo = publisher
29
+ self._runner_factory = runner_factory # async factory: dict -> ILLMRunner
30
  self._sessions: Dict[str, _Session] = {}
31
  self._ready = asyncio.Event()
32
+ self._ready.set() # call clear()/set() if you preload history
33
+ self._service_id_lc = settings.SERVICE_ID.lower()
34
 
35
+ async def init(self) -> None:
36
+ """Hook to preload history/sessions; call self._ready.set() when finished."""
 
 
 
 
 
 
37
  pass
38
 
39
  # ---------------------------- helpers ----------------------------
40
 
41
  def _to_model(self, data: Any) -> LLMServiceObj:
42
+ """Accepts LLMServiceObj or dict and returns a validated LLMServiceObj."""
 
 
43
  if isinstance(data, LLMServiceObj):
44
  return data
45
  if isinstance(data, dict):
46
  return LLMServiceObj(**data)
 
47
  raise TypeError("LLMService expects an object payload (dict/LLMServiceObj).")
48
 
49
  async def _emit_result(
50
  self,
51
+ obj: LLMServiceObj | Dict[str, Any],
52
  message: str,
53
  success: bool,
54
  queue: str,
55
  *,
56
  check_system: bool = False,
57
+ include_llm_message: bool = True,
58
+ ) -> None:
59
  """
60
  Build a ResultObj-style message on the wire, mirroring your .NET usage.
61
  check_system=True -> don't publish if obj.IsSystemLlm is True (matches your rule).
62
  """
63
+ llm = obj if isinstance(obj, LLMServiceObj) else LLMServiceObj(**obj)
64
+
65
+ llm.ResultMessage = message
66
+ llm.ResultSuccess = success
67
  if include_llm_message:
68
+ llm.LlmMessage = f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>"
69
 
70
+ if check_system and llm.IsSystemLlm:
71
  return
72
 
73
  # You publish LLMServiceObj on "llmServiceMessage"/"llmSessionMessage" in .NET
74
+ await self._pub.publish(queue, llm)
 
75
 
76
  def _session_for(self, session_id: str) -> Optional[_Session]:
77
  return self._sessions.get(session_id)
78
 
79
  # ---------------------------- API methods ----------------------------
80
 
81
+ async def StartProcess(self, payload: Any) -> None:
82
  llm = self._to_model(payload)
83
 
84
+ # Validate critical fields
85
+ if not llm.RequestSessionId:
86
+ await self._emit_result(llm, "Error: RequestSessionId is required.", False, "llmServiceMessage")
87
+ return
88
+ if not llm.LLMRunnerType:
89
+ await self._emit_result(llm, "Error: LLMRunnerType is required.", False, "llmServiceMessage")
90
+ return
91
+
92
+ # Construct session id like C#: RequestSessionId + "_" + LLMRunnerType
93
  session_id = f"{llm.RequestSessionId}_{llm.LLMRunnerType}"
94
  llm.SessionId = session_id
95
 
 
103
  return
104
 
105
  sess = self._session_for(session_id)
106
+ runner = sess.Runner if sess else None
107
+ create_new = (runner is None) or getattr(runner, "IsStateFailed", False)
108
 
109
  if create_new:
110
  # Remove previous runner if exists
111
+ if runner:
112
  try:
113
+ await runner.RemoveProcess(session_id)
114
  except Exception:
115
  pass
116
 
117
+ # Create runner from factory (pass a plain dict for decoupling)
118
+ runner = await self._runner_factory(llm.model_dump())
119
  if not runner.IsEnabled:
120
  await self._emit_result(
121
  llm,
 
133
 
134
  self._sessions[session_id] = _Session(Runner=runner, FullSessionId=session_id)
135
 
136
+ # Friendly greeting for your renamed service
137
+ if self._service_id_lc in {"monitor", "gradllm"}:
138
  await self._emit_result(
139
  llm,
140
  f"Hi i'm {runner.Type} your {settings.SERVICE_ID} Assistant. How can I help you.",
 
143
  check_system=True,
144
  )
145
 
146
+ # Notify "started" (full LLMServiceObj)
147
  await self._pub.publish("llmServiceStarted", llm)
148
 
149
+ async def RemoveSession(self, payload: Any) -> None:
150
  llm = self._to_model(payload)
151
  base = (llm.SessionId or "").split("_")[0]
152
+ if not base:
153
+ await self._emit_result(llm, "Error: SessionId is required to remove sessions.", False, "llmServiceMessage")
154
+ return
155
 
156
+ targets = [k for k in list(self._sessions.keys()) if k.startswith(base + "_")]
157
+ msgs: list[str] = []
158
  ok = True
159
+
160
  for sid in targets:
161
  s = self._sessions.get(sid)
162
  if not s or not s.Runner:
 
180
  else:
181
  await self._emit_result(llm, " ".join(msgs), False, "llmServiceMessage")
182
 
183
+ async def StopRequest(self, payload: Any) -> None:
184
  llm = self._to_model(payload)
185
  sid = llm.SessionId or ""
186
  s = self._session_for(sid)
 
197
  check_system=True,
198
  )
199
 
200
+ async def UserInput(self, payload: Any) -> None:
201
  llm = self._to_model(payload)
202
  sid = llm.SessionId or ""
203
  s = self._session_for(sid)
 
206
  return
207
 
208
  r: ILLMRunner = s.Runner
209
+ if getattr(r, "IsStateStarting", False):
210
  await self._emit_result(llm, "Please wait, the assistant is starting...", False, "llmServiceMessage")
211
  return
212
+ if getattr(r, "IsStateFailed", False):
213
  await self._emit_result(llm, "The Assistant is stopped. Try reloading.", False, "llmServiceMessage")
214
  return
215
 
216
  # Let runner push partials itself if desired; we still return a small ack
217
  await r.SendInputAndGetResponse(llm.model_dump())
218
 
219
+ async def QueryIndexResult(self, payload: Any) -> None:
220
  """
221
  Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator.
222
  Here, we forward a service message containing the same info so the UI can reflect completion.
 
227
  outputs = data.get("QueryResults") or []
228
  rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
229
 
 
230
  await self._pub.publish(
231
  "llmServiceMessage",
232
  ResultObj(
233
  Message=data.get("Message", ""),
234
  Success=bool(data.get("Success", False)),
235
  Data=rag_data,
236
+ ),
237
  )
238
  except Exception as e:
239
  await self._pub.publish(
240
  "llmServiceMessage",
241
+ ResultObj(Message=str(e), Success=False),
242
  )
243
 
244
+ async def GetFunctionRegistry(self, filtered: bool = False) -> None:
245
  """
246
  Wire up to your real registry when ready.
247
  For now, mimic your success message payload.
 
250
  msg = f"Success : Got GetFunctionCatalogJson : {catalog}"
251
  await self._pub.publish(
252
  "llmServiceMessage",
253
+ ResultObj(Message=msg, Success=True),
254
  )