Commit
·
bbfbcdd
1
Parent(s):
a98a368
- listener.py +3 -9
- models.py +10 -57
- rabbit_base.py +4 -2
- rabbit_repo.py +11 -9
- requirements.txt +1 -0
listener.py
CHANGED
|
@@ -2,12 +2,8 @@ import json
|
|
| 2 |
import aio_pika
|
| 3 |
from typing import Callable, Awaitable, Dict, Any, List, Optional
|
| 4 |
|
| 5 |
-
from models import CloudEvent
|
| 6 |
-
|
| 7 |
-
Handler = Callable[[Any], Awaitable[None]] # data can be dict / list / str
|
| 8 |
-
|
| 9 |
class RabbitListenerBase:
|
| 10 |
-
def __init__(self, base, instance_name: str, handlers: Dict[str,
|
| 11 |
self._base = base
|
| 12 |
self._instance_name = instance_name
|
| 13 |
self._handlers = handlers
|
|
@@ -35,13 +31,11 @@ class RabbitListenerBase:
|
|
| 35 |
async with msg.process():
|
| 36 |
try:
|
| 37 |
envelope = json.loads(msg.body.decode("utf-8"))
|
| 38 |
-
|
| 39 |
-
# (C# side doesn’t require strict validation either)
|
| 40 |
-
data = envelope.get("data", None)
|
| 41 |
if handler:
|
| 42 |
await handler(data)
|
| 43 |
except Exception:
|
| 44 |
-
#
|
| 45 |
pass
|
| 46 |
|
| 47 |
return _on_msg
|
|
|
|
| 2 |
import aio_pika
|
| 3 |
from typing import Callable, Awaitable, Dict, Any, List, Optional
|
| 4 |
|
|
|
|
|
|
|
|
|
|
|
|
|
| 5 |
class RabbitListenerBase:
|
| 6 |
+
def __init__(self, base, instance_name: str, handlers: Dict[str, Callable[[Any], Awaitable[None]]]):
|
| 7 |
self._base = base
|
| 8 |
self._instance_name = instance_name
|
| 9 |
self._handlers = handlers
|
|
|
|
| 31 |
async with msg.process():
|
| 32 |
try:
|
| 33 |
envelope = json.loads(msg.body.decode("utf-8"))
|
| 34 |
+
data = envelope.get("data", None) # dict / list / str
|
|
|
|
|
|
|
| 35 |
if handler:
|
| 36 |
await handler(data)
|
| 37 |
except Exception:
|
| 38 |
+
# Avoid requeue storms; log if you add a logger
|
| 39 |
pass
|
| 40 |
|
| 41 |
return _on_msg
|
models.py
CHANGED
|
@@ -1,9 +1,11 @@
|
|
|
|
|
|
|
|
| 1 |
from typing import Any, Optional, List
|
| 2 |
from datetime import datetime, timezone
|
| 3 |
-
from pydantic import BaseModel, Field
|
| 4 |
|
| 5 |
|
| 6 |
-
# ---------- CloudEvent ----------
|
| 7 |
class CloudEvent(BaseModel):
|
| 8 |
specversion: str = "1.0"
|
| 9 |
id: str
|
|
@@ -14,7 +16,7 @@ class CloudEvent(BaseModel):
|
|
| 14 |
data: Optional[Any] = None
|
| 15 |
|
| 16 |
@staticmethod
|
| 17 |
-
def
|
| 18 |
return datetime.now(timezone.utc)
|
| 19 |
|
| 20 |
@classmethod
|
|
@@ -23,17 +25,11 @@ class CloudEvent(BaseModel):
|
|
| 23 |
id=event_id,
|
| 24 |
type=event_type or ("NullOrEmpty" if data is None else type(data).__name__),
|
| 25 |
source=source,
|
| 26 |
-
time=cls.
|
| 27 |
data=data,
|
| 28 |
)
|
| 29 |
|
| 30 |
|
| 31 |
-
from typing import Any, Dict, Optional, List
|
| 32 |
-
from datetime import datetime, timezone
|
| 33 |
-
from pydantic import BaseModel, Field, ConfigDict # <-- add ConfigDict
|
| 34 |
-
|
| 35 |
-
# ... keep CloudEvent as you had it ...
|
| 36 |
-
|
| 37 |
# ---------- Permissive ancillary types ----------
|
| 38 |
class FunctionState(BaseModel):
|
| 39 |
IsFunctionCall: bool = False
|
|
@@ -42,19 +38,13 @@ class FunctionState(BaseModel):
|
|
| 42 |
IsFunctionCallStatus: bool = False
|
| 43 |
IsFunctionStillRunning: bool = False
|
| 44 |
|
| 45 |
-
def set_tuple(self, call: bool, resp: bool, err: bool, status: bool, running: bool):
|
| 46 |
-
self.IsFunctionCall = call
|
| 47 |
-
self.IsFunctionCallResponse = resp
|
| 48 |
-
self.IsFunctionCallError = err
|
| 49 |
-
self.IsFunctionCallStatus = status
|
| 50 |
-
self.IsFunctionStillRunning = running
|
| 51 |
|
| 52 |
class FunctionCallData(BaseModel):
|
| 53 |
-
# Accept
|
| 54 |
model_config = ConfigDict(extra="allow")
|
| 55 |
|
|
|
|
| 56 |
class UserInfo(BaseModel):
|
| 57 |
-
# Accept any extra keys (v2 style)
|
| 58 |
model_config = ConfigDict(extra="allow")
|
| 59 |
|
| 60 |
|
|
@@ -93,57 +83,20 @@ class LLMServiceObj(BaseModel):
|
|
| 93 |
UserInfo: UserInfo = Field(default_factory=UserInfo)
|
| 94 |
StartTimeUTC: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
| 95 |
|
| 96 |
-
# stacks (
|
| 97 |
LlmStack: List[str] = Field(default_factory=list)
|
| 98 |
FunctionCallIdStack: List[str] = Field(default_factory=list)
|
| 99 |
FunctionNameStack: List[str] = Field(default_factory=list)
|
| 100 |
IsProcessedStack: List[bool] = Field(default_factory=list)
|
| 101 |
MessageIDStack: List[str] = Field(default_factory=list)
|
| 102 |
|
| 103 |
-
# function state (
|
| 104 |
-
# Your C# stores this in a private field + setters; we expose the same booleans as a nested object.
|
| 105 |
IsFunctionCall: bool = False
|
| 106 |
IsFunctionCallResponse: bool = False
|
| 107 |
IsFunctionCallError: bool = False
|
| 108 |
IsFunctionCallStatus: bool = False
|
| 109 |
IsFunctionStillRunning: bool = False
|
| 110 |
|
| 111 |
-
# Convenience helpers (optional)
|
| 112 |
-
def push_llm(self, llm_name: str, new_call_id: str, new_func_name: str, new_message_id: str, new_is_processed: bool):
|
| 113 |
-
if self.SourceLlm:
|
| 114 |
-
self.LlmStack.append(self.SourceLlm)
|
| 115 |
-
self.SourceLlm = self.DestinationLlm
|
| 116 |
-
self.DestinationLlm = llm_name
|
| 117 |
-
|
| 118 |
-
if self.MessageID:
|
| 119 |
-
self.MessageIDStack.append(self.MessageID)
|
| 120 |
-
self.MessageID = new_message_id
|
| 121 |
-
|
| 122 |
-
if self.FunctionCallId:
|
| 123 |
-
self.FunctionCallIdStack.append(self.FunctionCallId)
|
| 124 |
-
self.FunctionCallId = new_call_id
|
| 125 |
-
|
| 126 |
-
if self.FunctionName:
|
| 127 |
-
self.FunctionNameStack.append(self.FunctionName)
|
| 128 |
-
self.FunctionName = new_func_name
|
| 129 |
-
|
| 130 |
-
self.IsProcessedStack.append(self.IsProcessed)
|
| 131 |
-
self.IsProcessed = new_is_processed
|
| 132 |
-
|
| 133 |
-
def pop_llm(self):
|
| 134 |
-
if self.LlmStack:
|
| 135 |
-
self.SourceLlm = self.LlmStack.pop()
|
| 136 |
-
self.DestinationLlm = self.SourceLlm
|
| 137 |
-
|
| 138 |
-
if self.MessageIDStack:
|
| 139 |
-
self.MessageID = self.MessageIDStack.pop()
|
| 140 |
-
if self.FunctionCallIdStack:
|
| 141 |
-
self.FunctionCallId = self.FunctionCallIdStack.pop()
|
| 142 |
-
if self.FunctionNameStack:
|
| 143 |
-
self.FunctionName = self.FunctionNameStack.pop()
|
| 144 |
-
if self.IsProcessedStack:
|
| 145 |
-
self.IsProcessed = self.IsProcessedStack.pop()
|
| 146 |
-
|
| 147 |
|
| 148 |
# ---------- ResultObj ----------
|
| 149 |
class ResultObj(BaseModel):
|
|
|
|
| 1 |
+
from __future__ import annotations
|
| 2 |
+
|
| 3 |
from typing import Any, Optional, List
|
| 4 |
from datetime import datetime, timezone
|
| 5 |
+
from pydantic import BaseModel, Field, ConfigDict
|
| 6 |
|
| 7 |
|
| 8 |
+
# ---------- CloudEvent (Pydantic v2 model) ----------
|
| 9 |
class CloudEvent(BaseModel):
|
| 10 |
specversion: str = "1.0"
|
| 11 |
id: str
|
|
|
|
| 16 |
data: Optional[Any] = None
|
| 17 |
|
| 18 |
@staticmethod
|
| 19 |
+
def now_utc() -> datetime:
|
| 20 |
return datetime.now(timezone.utc)
|
| 21 |
|
| 22 |
@classmethod
|
|
|
|
| 25 |
id=event_id,
|
| 26 |
type=event_type or ("NullOrEmpty" if data is None else type(data).__name__),
|
| 27 |
source=source,
|
| 28 |
+
time=cls.now_utc(),
|
| 29 |
data=data,
|
| 30 |
)
|
| 31 |
|
| 32 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 33 |
# ---------- Permissive ancillary types ----------
|
| 34 |
class FunctionState(BaseModel):
|
| 35 |
IsFunctionCall: bool = False
|
|
|
|
| 38 |
IsFunctionCallStatus: bool = False
|
| 39 |
IsFunctionStillRunning: bool = False
|
| 40 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 41 |
|
| 42 |
class FunctionCallData(BaseModel):
|
| 43 |
+
# Accept arbitrary keys so we mirror your .NET payloads without schema fights
|
| 44 |
model_config = ConfigDict(extra="allow")
|
| 45 |
|
| 46 |
+
|
| 47 |
class UserInfo(BaseModel):
|
|
|
|
| 48 |
model_config = ConfigDict(extra="allow")
|
| 49 |
|
| 50 |
|
|
|
|
| 83 |
UserInfo: UserInfo = Field(default_factory=UserInfo)
|
| 84 |
StartTimeUTC: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
|
| 85 |
|
| 86 |
+
# stacks (arrays; C# Stack<T> will deserialize fine)
|
| 87 |
LlmStack: List[str] = Field(default_factory=list)
|
| 88 |
FunctionCallIdStack: List[str] = Field(default_factory=list)
|
| 89 |
FunctionNameStack: List[str] = Field(default_factory=list)
|
| 90 |
IsProcessedStack: List[bool] = Field(default_factory=list)
|
| 91 |
MessageIDStack: List[str] = Field(default_factory=list)
|
| 92 |
|
| 93 |
+
# function state flags (since your C# exposes these via methods/flags)
|
|
|
|
| 94 |
IsFunctionCall: bool = False
|
| 95 |
IsFunctionCallResponse: bool = False
|
| 96 |
IsFunctionCallError: bool = False
|
| 97 |
IsFunctionCallStatus: bool = False
|
| 98 |
IsFunctionStillRunning: bool = False
|
| 99 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 100 |
|
| 101 |
# ---------- ResultObj ----------
|
| 102 |
class ResultObj(BaseModel):
|
rabbit_base.py
CHANGED
|
@@ -1,10 +1,12 @@
|
|
| 1 |
-
import asyncio
|
| 2 |
-
import aio_pika
|
| 3 |
from typing import Callable, Dict, List, Optional
|
|
|
|
|
|
|
| 4 |
from config import settings
|
| 5 |
|
| 6 |
ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
|
| 7 |
|
|
|
|
| 8 |
class RabbitBase:
|
| 9 |
def __init__(self, exchange_type_resolver: Optional[ExchangeResolver] = None):
|
| 10 |
self._conn: Optional[aio_pika.RobustConnection] = None
|
|
|
|
| 1 |
+
import asyncio
|
|
|
|
| 2 |
from typing import Callable, Dict, List, Optional
|
| 3 |
+
import aio_pika
|
| 4 |
+
|
| 5 |
from config import settings
|
| 6 |
|
| 7 |
ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
|
| 8 |
|
| 9 |
+
|
| 10 |
class RabbitBase:
|
| 11 |
def __init__(self, exchange_type_resolver: Optional[ExchangeResolver] = None):
|
| 12 |
self._conn: Optional[aio_pika.RobustConnection] = None
|
rabbit_repo.py
CHANGED
|
@@ -2,14 +2,16 @@ import uuid
|
|
| 2 |
from typing import Any, Optional
|
| 3 |
import aio_pika
|
| 4 |
|
|
|
|
| 5 |
from models import CloudEvent
|
|
|
|
| 6 |
from utils import to_json, json_compress_str
|
| 7 |
|
| 8 |
|
| 9 |
-
class RabbitRepo:
|
| 10 |
-
|
| 11 |
super().__init__(exchange_type_resolver=self._resolve_type)
|
| 12 |
-
self.
|
| 13 |
|
| 14 |
def _resolve_type(self, exch: str) -> str:
|
| 15 |
# longest prefix wins (like your .NET mapping)
|
|
@@ -19,19 +21,19 @@ class RabbitRepo:
|
|
| 19 |
return settings.RABBIT_EXCHANGE_TYPE
|
| 20 |
|
| 21 |
async def publish(self, exchange: str, obj: Any, routing_key: str = ""):
|
| 22 |
-
ex = await self.
|
| 23 |
evt = CloudEvent.wrap(
|
| 24 |
event_id=str(uuid.uuid4()),
|
| 25 |
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
|
| 26 |
source=self._source,
|
| 27 |
-
data=obj,
|
| 28 |
)
|
| 29 |
-
body = evt.model_dump_json(
|
| 30 |
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
|
| 31 |
|
| 32 |
async def publish_jsonz(self, exchange: str, obj: Any, routing_key: str = "", with_id: Optional[str] = None) -> str:
|
| 33 |
-
ex = await self.
|
| 34 |
-
datajson = to_json(obj)
|
| 35 |
datajsonZ = json_compress_str(datajson)
|
| 36 |
payload: Any = (datajsonZ, with_id) if with_id else datajsonZ
|
| 37 |
|
|
@@ -41,6 +43,6 @@ class RabbitRepo:
|
|
| 41 |
source=self._source,
|
| 42 |
data=payload,
|
| 43 |
)
|
| 44 |
-
body = evt.model_dump_json(
|
| 45 |
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
|
| 46 |
return datajsonZ
|
|
|
|
| 2 |
from typing import Any, Optional
|
| 3 |
import aio_pika
|
| 4 |
|
| 5 |
+
from config import settings
|
| 6 |
from models import CloudEvent
|
| 7 |
+
from rabbit_base import RabbitBase
|
| 8 |
from utils import to_json, json_compress_str
|
| 9 |
|
| 10 |
|
| 11 |
+
class RabbitRepo(RabbitBase):
|
| 12 |
+
def __init__(self, external_source: str):
|
| 13 |
super().__init__(exchange_type_resolver=self._resolve_type)
|
| 14 |
+
self._source = external_source # like SystemUrl.ExternalUrl
|
| 15 |
|
| 16 |
def _resolve_type(self, exch: str) -> str:
|
| 17 |
# longest prefix wins (like your .NET mapping)
|
|
|
|
| 21 |
return settings.RABBIT_EXCHANGE_TYPE
|
| 22 |
|
| 23 |
async def publish(self, exchange: str, obj: Any, routing_key: str = ""):
|
| 24 |
+
ex = await self.ensure_exchange(exchange)
|
| 25 |
evt = CloudEvent.wrap(
|
| 26 |
event_id=str(uuid.uuid4()),
|
| 27 |
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
|
| 28 |
source=self._source,
|
| 29 |
+
data=obj if not hasattr(obj, "model_dump") else obj.model_dump(),
|
| 30 |
)
|
| 31 |
+
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
|
| 32 |
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
|
| 33 |
|
| 34 |
async def publish_jsonz(self, exchange: str, obj: Any, routing_key: str = "", with_id: Optional[str] = None) -> str:
|
| 35 |
+
ex = await self.ensure_exchange(exchange)
|
| 36 |
+
datajson = to_json(obj if not hasattr(obj, "model_dump") else obj.model_dump())
|
| 37 |
datajsonZ = json_compress_str(datajson)
|
| 38 |
payload: Any = (datajsonZ, with_id) if with_id else datajsonZ
|
| 39 |
|
|
|
|
| 43 |
source=self._source,
|
| 44 |
data=payload,
|
| 45 |
)
|
| 46 |
+
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
|
| 47 |
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
|
| 48 |
return datajsonZ
|
requirements.txt
CHANGED
|
@@ -4,3 +4,4 @@ uvicorn==0.35.0
|
|
| 4 |
aio-pika==9.5.7
|
| 5 |
pydantic==2.11.1
|
| 6 |
pydantic-settings==2.10.1
|
|
|
|
|
|
| 4 |
aio-pika==9.5.7
|
| 5 |
pydantic==2.11.1
|
| 6 |
pydantic-settings==2.10.1
|
| 7 |
+
|