johnbridges commited on
Commit
8d27c84
·
1 Parent(s): ee97085

added test llm runner

Browse files
Files changed (8) hide show
  1. app.py +3 -13
  2. factory.py +6 -0
  3. function_tracker.py +42 -0
  4. message_helper.py +5 -0
  5. models.py +34 -0
  6. runners/echo.py +46 -0
  7. service.py +10 -16
  8. streaming.py +20 -0
app.py CHANGED
@@ -10,6 +10,7 @@ from listener import RabbitListenerBase
10
  from rabbit_repo import RabbitRepo
11
  from service import LLMService
12
  from runners.base import ILLMRunner
 
13
 
14
  # ---------- logging ----------
15
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
@@ -25,21 +26,10 @@ try:
25
  except Exception:
26
  def gpu_entrypoint() -> str:
27
  return "gpu: not available (CPU only)"
28
-
29
- # ---------- Runner factory (stub) ----------
30
- class EchoRunner(ILLMRunner):
31
- Type = "EchoRunner"
32
- async def StartProcess(self, llmServiceObj: dict): pass
33
- async def RemoveProcess(self, sessionId: str): pass
34
- async def StopRequest(self, sessionId: str): pass
35
- async def SendInputAndGetResponse(self, llmServiceObj: dict): pass
36
-
37
- async def runner_factory(llmServiceObj: dict) -> ILLMRunner:
38
- return EchoRunner()
39
-
40
  # ---------- Publisher & Service ----------
41
  publisher = RabbitRepo(external_source="https://space.external")
42
- service = LLMService(publisher, runner_factory)
43
 
44
  # ---------- Handlers (.NET FuncName -> service) ----------
45
  async def h_start(data): await service.StartProcess(data or {})
 
10
  from rabbit_repo import RabbitRepo
11
  from service import LLMService
12
  from runners.base import ILLMRunner
13
+ from factory import default_runner_factory
14
 
15
  # ---------- logging ----------
16
  logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
 
26
  except Exception:
27
  def gpu_entrypoint() -> str:
28
  return "gpu: not available (CPU only)"
29
+
 
 
 
 
 
 
 
 
 
 
 
30
  # ---------- Publisher & Service ----------
31
  publisher = RabbitRepo(external_source="https://space.external")
32
+ service = LLMService(publisher, default_runner_factory)
33
 
34
  # ---------- Handlers (.NET FuncName -> service) ----------
35
  async def h_start(data): await service.StartProcess(data or {})
factory.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ # factories.py
2
+ from runners.echo import EchoRunner
3
+
4
+ async def default_runner_factory(context: Dict[str, Any]) -> ILLMRunner:
5
+ # choose runner by context["LLMRunnerType"] if you need variants
6
+ return EchoRunner(publisher=context["_publisher"], settings=context["_settings"])
function_tracker.py ADDED
@@ -0,0 +1,42 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # function_tracker.py
2
+ from __future__ import annotations
3
+ from dataclasses import dataclass
4
+ from typing import Dict, List
5
+ import random
6
+
7
+ @dataclass
8
+ class TrackedCall:
9
+ FunctionCallId: str
10
+ FunctionName: str
11
+ IsProcessed: bool = False
12
+ Payload: str = ""
13
+
14
+ class FunctionCallTracker:
15
+ def __init__(self) -> None:
16
+ self._by_msg: Dict[str, Dict[str, TrackedCall]] = {}
17
+
18
+ @staticmethod
19
+ def gen_id() -> str:
20
+ return f"call_{random.randint(10_000_000, 99_999_999)}"
21
+
22
+ def add(self, message_id: str, fn_name: str, payload: str) -> str:
23
+ call_id = self.gen_id()
24
+ self._by_msg.setdefault(message_id, {})[call_id] = TrackedCall(call_id, fn_name, False, payload)
25
+ return call_id
26
+
27
+ def mark_processed(self, message_id: str, call_id: str, payload: str = "") -> None:
28
+ m = self._by_msg.get(message_id, {})
29
+ if call_id in m:
30
+ m[call_id].IsProcessed = True
31
+ if payload:
32
+ m[call_id].Payload = payload
33
+
34
+ def all_processed(self, message_id: str) -> bool:
35
+ m = self._by_msg.get(message_id, {})
36
+ return bool(m) and all(x.IsProcessed for x in m.values())
37
+
38
+ def processed_list(self, message_id: str) -> List[TrackedCall]:
39
+ return list(self._by_msg.get(message_id, {}).values())
40
+
41
+ def clear(self, message_id: str) -> None:
42
+ self._by_msg.pop(message_id, None)
message_helper.py ADDED
@@ -0,0 +1,5 @@
 
 
 
 
 
 
1
+ # message_helper.py
2
+ def success(msg: str) -> str: return f"</llm-success>{msg}"
3
+ def error(msg: str) -> str: return f"</llm-error>{msg}"
4
+ def warning(msg: str) -> str: return f"</llm-warning>{msg}"
5
+ def info(msg: str) -> str: return f"</llm-info>{msg}"
models.py CHANGED
@@ -127,6 +127,40 @@ class LLMServiceObj(BaseModel):
127
  IsFunctionCallError: bool = False
128
  IsFunctionCallStatus: bool = False
129
  IsFunctionStillRunning: bool = False
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
130
 
131
 
132
  class ResultObj(BaseModel):
 
127
  IsFunctionCallError: bool = False
128
  IsFunctionCallStatus: bool = False
129
  IsFunctionStillRunning: bool = False
130
+
131
+ def set_as_call(self) -> "LLMServiceObj":
132
+ self.IsFunctionCall = True
133
+ self.IsFunctionCallResponse = False
134
+ self.IsFunctionCallError = False
135
+ self.IsFunctionCallStatus = False
136
+ self.IsFunctionStillRunning = False
137
+ return self
138
+
139
+ def set_as_call_error(self) -> "LLMServiceObj":
140
+ self.IsFunctionCall = True
141
+ self.IsFunctionCallResponse = False
142
+ self.IsFunctionCallError = True
143
+ self.IsFunctionCallStatus = False
144
+ self.IsFunctionStillRunning = False
145
+ return self
146
+
147
+ def set_as_response_complete(self) -> "LLMServiceObj":
148
+ self.IsFunctionCall = False
149
+ self.IsFunctionCallResponse = True
150
+ self.IsFunctionCallError = False
151
+ self.IsFunctionCallStatus = False
152
+ self.IsFunctionStillRunning = False
153
+ self.IsProcessed = True
154
+ return self
155
+
156
+ def set_as_not_call(self) -> "LLMServiceObj":
157
+ self.IsFunctionCall = False
158
+ self.IsFunctionCallResponse = False
159
+ self.IsFunctionCallError = False
160
+ self.IsFunctionCallStatus = False
161
+ self.IsFunctionStillRunning = False
162
+ return self
163
+
164
 
165
 
166
  class ResultObj(BaseModel):
runners/echo.py ADDED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # runners/echo.py
2
+ from __future__ import annotations
3
+ from typing import Any, Dict, Optional
4
+ from .base import ILLMRunner
5
+ from models import LLMServiceObj
6
+ from function_tracker import FunctionCallTracker
7
+ import logging
8
+
9
+ class EchoRunner(ILLMRunner):
10
+ Type = "TurboLLM"
11
+ IsEnabled = True
12
+ IsStateStarting = False
13
+ IsStateFailed = False
14
+
15
+ def __init__(self, publisher, settings):
16
+ self._pub = publisher
17
+ self._settings = settings
18
+ self._tracker = FunctionCallTracker()
19
+ self._log = logging.getLogger("EchoRunner")
20
+
21
+ async def StartProcess(self, llmServiceObj: dict) -> None:
22
+ self._log.debug(f"StartProcess called with: {llmServiceObj}")
23
+ # pretend to “warm up”
24
+ pass
25
+
26
+ async def RemoveProcess(self, sessionId: str) -> None:
27
+ self._log.debug(f"RemoveProcess called for session: {sessionId}")
28
+ # nothing to clean here
29
+ pass
30
+
31
+ async def StopRequest(self, sessionId: str) -> None:
32
+ self._log.debug(f"StopRequest called for session: {sessionId}")
33
+ # no streaming loop to stop in echo
34
+ pass
35
+
36
+ async def SendInputAndGetResponse(self, llmServiceObj: dict) -> None:
37
+ self._log.debug(f"SendInputAndGetResponse called with: {llmServiceObj}")
38
+ llm = LLMServiceObj(**llmServiceObj)
39
+ if llm.UserInput.startswith("<|START_AUDIO|>") or llm.UserInput.startswith("<|STOP_AUDIO|>"):
40
+ self._log.debug("Audio input detected, ignoring in echo.")
41
+ return
42
+
43
+ # Echo behavior (match UI format)
44
+ await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<User:> {llm.UserInput}\n\n"))
45
+ await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Assistant:> You said: {llm.UserInput}\n"))
46
+ await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="<end-of-line>"))
service.py CHANGED
@@ -7,7 +7,7 @@ from config import settings
7
  from models import LLMServiceObj, ResultObj
8
  from rabbit_repo import RabbitRepo
9
  from runners.base import ILLMRunner
10
-
11
 
12
  @dataclass
13
  class _Session:
@@ -65,7 +65,7 @@ class LLMService:
65
  llm.ResultMessage = message
66
  llm.ResultSuccess = success
67
  if include_llm_message:
68
- llm.LlmMessage = f"<Success>{message}</Success>" if success else f"<Error>{message}</Error>"
69
 
70
  if check_system and llm.IsSystemLlm:
71
  return
@@ -164,6 +164,7 @@ class LLMService:
164
  try:
165
  await s.Runner.RemoveProcess(sid)
166
  s.Runner = None
 
167
  msgs.append(sid)
168
  except Exception as e:
169
  ok = False
@@ -217,29 +218,22 @@ class LLMService:
217
  await r.SendInputAndGetResponse(llm.model_dump(by_alias=True))
218
 
219
  async def QueryIndexResult(self, payload: Any) -> None:
220
- """
221
- Your .NET listener concatenates RAG outputs, sets ResultObj, and notifies the coordinator.
222
- Here, we forward a service message containing the same info so the UI can reflect completion.
223
- 'payload' usually has: Success, Message, QueryResults: [{Output: "..."}]
224
- """
225
  try:
226
  data = payload if isinstance(payload, dict) else {}
227
  outputs = data.get("QueryResults") or []
228
  rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
229
 
 
 
 
 
 
230
  await self._pub.publish(
231
  "llmServiceMessage",
232
- ResultObj(
233
- Message=data.get("Message", ""),
234
- Success=bool(data.get("Success", False)),
235
- Data=rag_data,
236
- ),
237
  )
238
  except Exception as e:
239
- await self._pub.publish(
240
- "llmServiceMessage",
241
- ResultObj(Message=str(e), Success=False),
242
- )
243
 
244
  async def GetFunctionRegistry(self, filtered: bool = False) -> None:
245
  """
 
7
  from models import LLMServiceObj, ResultObj
8
  from rabbit_repo import RabbitRepo
9
  from runners.base import ILLMRunner
10
+ from message_helper import success as _ok, error as _err
11
 
12
  @dataclass
13
  class _Session:
 
65
  llm.ResultMessage = message
66
  llm.ResultSuccess = success
67
  if include_llm_message:
68
+ llm.LlmMessage = _ok(message) if success else _err(message)
69
 
70
  if check_system and llm.IsSystemLlm:
71
  return
 
164
  try:
165
  await s.Runner.RemoveProcess(sid)
166
  s.Runner = None
167
+ self._sessions.pop(sid, None) # ← free the entry
168
  msgs.append(sid)
169
  except Exception as e:
170
  ok = False
 
218
  await r.SendInputAndGetResponse(llm.model_dump(by_alias=True))
219
 
220
  async def QueryIndexResult(self, payload: Any) -> None:
 
 
 
 
 
221
  try:
222
  data = payload if isinstance(payload, dict) else {}
223
  outputs = data.get("QueryResults") or []
224
  rag_data = "\n".join([x.get("Output", "") for x in outputs if isinstance(x, dict)])
225
 
226
+ # NEW: show RAG to the chat like tool output
227
+ await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage=f"<Function Response:> {rag_data}\n\n"))
228
+ await self._pub.publish("llmServiceMessage", LLMServiceObj(LlmMessage="</functioncall-complete>"))
229
+
230
+ # keep your existing summary object (nice for observers/metrics)
231
  await self._pub.publish(
232
  "llmServiceMessage",
233
+ ResultObj(Message=data.get("Message", ""), Success=bool(data.get("Success", False)), Data=rag_data),
 
 
 
 
234
  )
235
  except Exception as e:
236
+ await self._pub.publish("llmServiceMessage", ResultObj(Message=str(e), Success=False))
 
 
 
237
 
238
  async def GetFunctionRegistry(self, filtered: bool = False) -> None:
239
  """
streaming.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # streaming.py
2
+ import asyncio
3
+
4
+ async def stream_in_chunks(publish, exchange: str, llm_obj_builder, text: str,
5
+ batch_size: int = 3, max_chars: int = 100,
6
+ base_delay_ms: int = 30, per_char_ms: int = 2) -> None:
7
+ seps = set(" ,!?{}.:;\n")
8
+ buf, parts, count = [], [], 0
9
+ for ch in text:
10
+ parts.append(ch)
11
+ if ch in seps:
12
+ buf.append("".join(parts)); parts.clear(); count += 1
13
+ if count >= batch_size or sum(len(x) for x in buf) >= max_chars:
14
+ o = llm_obj_builder("".join(buf))
15
+ await publish(exchange, o)
16
+ await asyncio.sleep((base_delay_ms + per_char_ms * sum(len(x) for x in buf))/1000)
17
+ buf.clear(); count = 0
18
+ if parts: buf.append("".join(parts))
19
+ if buf:
20
+ await publish(exchange, llm_obj_builder("".join(buf)))