File size: 2,289 Bytes
bf292d9
7630510
0bf22fe
 
bf292d9
0bf22fe
 
 
7630510
 
 
 
 
 
 
bf292d9
2c8368f
0bf22fe
2c8368f
0bf22fe
bf292d9
 
 
 
2c8368f
bf292d9
2c8368f
bf292d9
0bf22fe
bf292d9
 
 
2c8368f
 
0bf22fe
 
 
bf292d9
 
 
 
 
2c8368f
bf292d9
 
 
7630510
 
 
 
 
 
 
 
0bf22fe
bf292d9
 
7630510
 
2c8368f
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import json
import logging
from typing import Callable, Awaitable, Dict, Any, List

import aio_pika

Handler = Callable[[Any], Awaitable[None]]  # payload is envelope["data"]

# Configure root logger if not already configured
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)


class RabbitListenerBase:
    def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
        self._base = base
        self._instance_name = instance_name  # queue prefix (like your .NET instance name)
        self._handlers = handlers
        self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []

    def _qname(self, exchange: str, routing_keys: List[str]) -> str:
        rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or ""
        suffix = f"-{rk_part}" if rk_part else ""
        return f"{self._instance_name}-{exchange}{suffix}"

    async def start(self, declarations: List[dict]) -> None:
        for d in declarations:
            exch = d["ExchangeName"]
            ttl = d.get("MessageTimeout") or None
            rks = d.get("RoutingKeys") or [""]
            qname = self._qname(exch, rks)
            q = await self._base.declare_queue_bind(
                exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
            )
            await q.consume(self._make_consumer(d["FuncName"]))
            self._consumers.append(q)

    def _make_consumer(self, func_name: str):
        handler = self._handlers.get(func_name)

        async def _on_msg(msg: aio_pika.IncomingMessage):
            async with msg.process():
                try:
                    raw_body = msg.body.decode("utf-8", errors="replace")
                    logger.info(
                        "Received message for handler '%s': %s",
                        func_name,
                        raw_body
                    )

                    envelope = json.loads(raw_body)
                    data = envelope.get("data", None)
                    if handler:
                        await handler(data)
                except Exception as e:
                    logger.exception("Error processing message for '%s'", func_name)

        return _on_msg