johnbridges commited on
Commit
531b571
·
1 Parent(s): 9a08c50

added rabbitmq user id to the outbound rabbitmq message

Browse files
__pycache__/app.cpython-313.pyc ADDED
Binary file (4.65 kB). View file
 
__pycache__/hf_backend.cpython-313.pyc ADDED
Binary file (16.4 kB). View file
 
__pycache__/oa_server.cpython-313.pyc ADDED
Binary file (7.94 kB). View file
 
__pycache__/rabbit_repo.cpython-313.pyc ADDED
Binary file (6.86 kB). View file
 
__pycache__/timesfm_backend.cpython-313.pyc ADDED
Binary file (13 kB). View file
 
app.py CHANGED
@@ -10,8 +10,7 @@ from oa_server import OpenAIServers
10
  #from vllm_backend import VLLMChatBackend, StubImagesBackend
11
  #from transformers_backend import TransformersChatBackend, StubImagesBackend
12
  #from hf_backend import HFChatBackend, StubImagesBackend
13
- from hf_backend import StubImagesBackend
14
- from timesfm_backend import TimesFMBackend
15
 
16
 
17
  logging.basicConfig(
@@ -62,7 +61,7 @@ async def _startup_init():
62
  try:
63
  await base.connect() # connect to RabbitMQ
64
  await listener.start(DECLS) # start queue listeners
65
- return "OpenAI MQ + vLLM: ready"
66
  except Exception as e:
67
  log.exception("Startup init failed")
68
  return f"ERROR: {e}"
 
10
  #from vllm_backend import VLLMChatBackend, StubImagesBackend
11
  #from transformers_backend import TransformersChatBackend, StubImagesBackend
12
  #from hf_backend import HFChatBackend, StubImagesBackend
13
+ from timesfm_backend import TimesFMBackend, StubImagesBackend
 
14
 
15
 
16
  logging.basicConfig(
 
61
  try:
62
  await base.connect() # connect to RabbitMQ
63
  await listener.start(DECLS) # start queue listeners
64
+ return "OpenAI MQ + TimesFM: ready"
65
  except Exception as e:
66
  log.exception("Startup init failed")
67
  return f"ERROR: {e}"
oa_server.py CHANGED
@@ -10,6 +10,28 @@ logger = logging.getLogger(__name__)
10
 
11
  # ------------------ helpers ------------------
12
  def _now() -> int: return int(time.time())
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
13
  def _chunk_text(s: str, sz: int = 140) -> List[str]:
14
  return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else []
15
  def _last_user_text(messages: List[Dict[str, Any]]) -> str:
@@ -55,17 +77,26 @@ class OpenAIServers:
55
  logger.error("oaChatCreate: missing reply_key")
56
  return
57
 
 
58
  try:
59
  async for chunk in self._chat.stream(data):
60
  try:
61
- await self._pub.publish("oa.chat.reply", chunk, routing_key=reply_key)
 
 
 
 
62
  except Exception:
63
  logger.exception("oaChatCreate: publish failed")
64
  break # stop streaming on publish failure
65
 
66
  # Optional sentinel
67
  try:
68
- await self._pub.publish("oa.chat.reply", {"object": "stream.end"}, routing_key=reply_key)
 
 
 
 
69
  except Exception:
70
  logger.exception("oaChatCreate: publish sentinel failed")
71
  except Exception:
@@ -84,11 +115,16 @@ class OpenAIServers:
84
  logger.error("oaImagesGenerate: missing reply_key")
85
  return
86
 
 
87
  try:
88
  b64 = await self._img.generate_b64(data)
89
  resp = {"created": _now(), "data":[{"b64_json": b64}]}
90
  try:
91
- await self._pub.publish("oa.images.reply", resp, routing_key=reply_key)
 
 
 
 
92
  except Exception:
93
  logger.exception("oaImagesGenerate: publish failed")
94
  except Exception:
 
10
 
11
  # ------------------ helpers ------------------
12
  def _now() -> int: return int(time.time())
13
+ def _extract_user_id(data: Dict[str, Any]) -> Optional[str]:
14
+ if not isinstance(data, dict):
15
+ return None
16
+ for k in ("userId", "UserID", "user_id"):
17
+ v = data.get(k)
18
+ if isinstance(v, str) and v.strip():
19
+ return v.strip()
20
+ ui = data.get("UserInfo")
21
+ if isinstance(ui, dict):
22
+ v = ui.get("UserID") or ui.get("userId") or ui.get("user_id")
23
+ if isinstance(v, str) and v.strip():
24
+ return v.strip()
25
+ return None
26
+
27
+ def _with_user_id(payload: Dict[str, Any], user_id: Optional[str]) -> Dict[str, Any]:
28
+ if not user_id:
29
+ return payload
30
+ if isinstance(payload, dict):
31
+ if not payload.get("userId"):
32
+ payload = {**payload, "userId": user_id}
33
+ return payload
34
+
35
  def _chunk_text(s: str, sz: int = 140) -> List[str]:
36
  return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else []
37
  def _last_user_text(messages: List[Dict[str, Any]]) -> str:
 
77
  logger.error("oaChatCreate: missing reply_key")
78
  return
79
 
80
+ user_id = _extract_user_id(data)
81
  try:
82
  async for chunk in self._chat.stream(data):
83
  try:
84
+ await self._pub.publish(
85
+ "oa.chat.reply",
86
+ _with_user_id(chunk, user_id),
87
+ routing_key=reply_key,
88
+ )
89
  except Exception:
90
  logger.exception("oaChatCreate: publish failed")
91
  break # stop streaming on publish failure
92
 
93
  # Optional sentinel
94
  try:
95
+ await self._pub.publish(
96
+ "oa.chat.reply",
97
+ _with_user_id({"object": "stream.end"}, user_id),
98
+ routing_key=reply_key,
99
+ )
100
  except Exception:
101
  logger.exception("oaChatCreate: publish sentinel failed")
102
  except Exception:
 
115
  logger.error("oaImagesGenerate: missing reply_key")
116
  return
117
 
118
+ user_id = _extract_user_id(data)
119
  try:
120
  b64 = await self._img.generate_b64(data)
121
  resp = {"created": _now(), "data":[{"b64_json": b64}]}
122
  try:
123
+ await self._pub.publish(
124
+ "oa.images.reply",
125
+ _with_user_id(resp, user_id),
126
+ routing_key=reply_key,
127
+ )
128
  except Exception:
129
  logger.exception("oaImagesGenerate: publish failed")
130
  except Exception:
rabbit_repo.py CHANGED
@@ -3,6 +3,7 @@ import uuid
3
  import asyncio
4
  import logging
5
  from typing import Any, Optional
 
6
 
7
  import aiormq
8
  import aio_pika
@@ -30,14 +31,30 @@ class RabbitRepo(RabbitBase):
30
  return settings.EXCHANGE_TYPES[max(matches, key=len)]
31
  return "fanout"
32
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  async def _publish_with_retry(self, exchange: str, body: bytes, routing_key: str = "") -> None:
34
  attempts, delay = 0, 0.5
 
35
  while True:
36
  try:
37
  ex = await self.ensure_exchange(exchange)
38
  msg = aio_pika.Message(
39
  body=body,
40
  delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
 
41
  )
42
  await ex.publish(msg, routing_key=routing_key)
43
  return
 
3
  import asyncio
4
  import logging
5
  from typing import Any, Optional
6
+ from urllib.parse import urlsplit, unquote
7
 
8
  import aiormq
9
  import aio_pika
 
31
  return settings.EXCHANGE_TYPES[max(matches, key=len)]
32
  return "fanout"
33
 
34
+ def _publisher_user_id(self) -> Optional[str]:
35
+ user = getattr(settings, "RABBIT_USER_NAME", None)
36
+ if isinstance(user, str) and user.strip():
37
+ return user.strip()
38
+ amqp_url = getattr(settings, "AMQP_URL", None)
39
+ if isinstance(amqp_url, str) and amqp_url.strip():
40
+ try:
41
+ parsed = urlsplit(amqp_url)
42
+ if parsed.username:
43
+ return unquote(parsed.username)
44
+ except Exception:
45
+ pass
46
+ return None
47
+
48
  async def _publish_with_retry(self, exchange: str, body: bytes, routing_key: str = "") -> None:
49
  attempts, delay = 0, 0.5
50
+ publisher_user_id = self._publisher_user_id()
51
  while True:
52
  try:
53
  ex = await self.ensure_exchange(exchange)
54
  msg = aio_pika.Message(
55
  body=body,
56
  delivery_mode=aio_pika.DeliveryMode.PERSISTENT,
57
+ user_id=publisher_user_id,
58
  )
59
  await ex.publish(msg, routing_key=routing_key)
60
  return
requirements.txt CHANGED
@@ -11,7 +11,6 @@ pydantic>=2.11.7
11
  pydantic-settings>=2.10.1
12
 
13
  # Hugging Face / LLM
14
- spaces
15
  accelerate
16
  autoawq
17
  huggingface_hub
 
11
  pydantic-settings>=2.10.1
12
 
13
  # Hugging Face / LLM
 
14
  accelerate
15
  autoawq
16
  huggingface_hub