File size: 1,930 Bytes
0bf22fe bf292d9 0bf22fe bf292d9 0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f 0bf22fe bf292d9 2c8368f bf292d9 2c8368f bf292d9 0bf22fe bf292d9 2c8368f 0bf22fe bf292d9 2c8368f bf292d9 0bf22fe bf292d9 2c8368f 0bf22fe bf292d9 2c8368f bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
# listener.py
import json
from typing import Callable, Awaitable, Dict, Any, List
import aio_pika
Handler = Callable[[Any], Awaitable[None]] # payload is envelope["data"]
class RabbitListenerBase:
def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
self._base = base
self._instance_name = instance_name # queue prefix (like your .NET instance name)
self._handlers = handlers
self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
def _qname(self, exchange: str, routing_keys: List[str]) -> str:
rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
suffix = f"-{rk_part}" if rk_part else ""
return f"{self._instance_name}-{exchange}{suffix}"
async def start(self, declarations: List[dict]) -> None:
for d in declarations:
exch = d["ExchangeName"]
ttl = d.get("MessageTimeout") or None
rks = d.get("RoutingKeys") or [""]
qname = self._qname(exch, rks)
q = await self._base.declare_queue_bind(
exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
)
await q.consume(self._make_consumer(d["FuncName"]))
self._consumers.append(q)
def _make_consumer(self, func_name: str):
handler = self._handlers.get(func_name)
async def _on_msg(msg: aio_pika.IncomingMessage):
async with msg.process():
try:
envelope = json.loads(msg.body.decode("utf-8"))
# Expect CloudEvent-ish envelope; we only need the 'data' field
data = envelope.get("data", None)
if handler:
await handler(data)
except Exception:
# Avoid requeue storms; add logging if you want
pass
return _on_msg
|