File size: 1,930 Bytes
0bf22fe
bf292d9
0bf22fe
 
bf292d9
0bf22fe
 
 
bf292d9
2c8368f
0bf22fe
2c8368f
0bf22fe
bf292d9
 
 
 
2c8368f
bf292d9
2c8368f
bf292d9
0bf22fe
bf292d9
 
 
2c8368f
 
0bf22fe
 
 
bf292d9
 
 
 
 
2c8368f
bf292d9
 
 
 
0bf22fe
 
bf292d9
 
2c8368f
0bf22fe
bf292d9
2c8368f
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# listener.py
import json
from typing import Callable, Awaitable, Dict, Any, List

import aio_pika

Handler = Callable[[Any], Awaitable[None]]  # payload is envelope["data"]


class RabbitListenerBase:
    def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
        self._base = base
        self._instance_name = instance_name  # queue prefix (like your .NET instance name)
        self._handlers = handlers
        self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []

    def _qname(self, exchange: str, routing_keys: List[str]) -> str:
        rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
        suffix = f"-{rk_part}" if rk_part else ""
        return f"{self._instance_name}-{exchange}{suffix}"

    async def start(self, declarations: List[dict]) -> None:
        for d in declarations:
            exch = d["ExchangeName"]
            ttl = d.get("MessageTimeout") or None
            rks = d.get("RoutingKeys") or [""]
            qname = self._qname(exch, rks)
            q = await self._base.declare_queue_bind(
                exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
            )
            await q.consume(self._make_consumer(d["FuncName"]))
            self._consumers.append(q)

    def _make_consumer(self, func_name: str):
        handler = self._handlers.get(func_name)

        async def _on_msg(msg: aio_pika.IncomingMessage):
            async with msg.process():
                try:
                    envelope = json.loads(msg.body.decode("utf-8"))
                    # Expect CloudEvent-ish envelope; we only need the 'data' field
                    data = envelope.get("data", None)
                    if handler:
                        await handler(data)
                except Exception:
                    # Avoid requeue storms; add logging if you want
                    pass

        return _on_msg