File size: 2,236 Bytes
0bf22fe
bf292d9
2c8368f
0bf22fe
2c8368f
 
bbfbcdd
2c8368f
bbfbcdd
ba71442
bf292d9
2c8368f
bbfbcdd
 
bf292d9
bbfbcdd
bf292d9
 
 
 
 
 
 
 
0bf22fe
bbfbcdd
0bf22fe
2c8368f
 
 
 
0bf22fe
 
bbfbcdd
2c8368f
 
0bf22fe
 
 
 
 
 
 
bbfbcdd
0bf22fe
 
2c8368f
0bf22fe
bf292d9
2c8368f
 
 
 
0bf22fe
2c8368f
bbfbcdd
2c8368f
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
# rabbit_repo.py
import uuid
from typing import Any, Optional

import aio_pika

from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str


class RabbitRepo(RabbitBase):
    def __init__(self, external_source: str):
        super().__init__(exchange_type_resolver=self._resolve_type)
        self._source = external_source  # like SystemUrl.ExternalUrl

    def _resolve_type(self, exch: str) -> str:
        # longest prefix wins (like your .NET mapping)
        matches = [k for k in settings.EXCHANGE_TYPES.keys() if exch.lower().startswith(k.lower())]
        if matches:
            return settings.EXCHANGE_TYPES[max(matches, key=len)]
        return settings.RABBIT_EXCHANGE_TYPE

    async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
        ex = await self.ensure_exchange(exchange)
        payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
        evt = CloudEvent.wrap(
            event_id=str(uuid.uuid4()),
            event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
            source=self._source,
            data=payload,
        )
        body = evt.model_dump_json(exclude_none=True).encode("utf-8")
        await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)

    async def publish_jsonz(
        self,
        exchange: str,
        obj: Any,
        routing_key: str = "",
        with_id: Optional[str] = None,
    ) -> str:
        ex = await self.ensure_exchange(exchange)
        payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
        datajson = to_json(payload)
        datajsonZ = json_compress_str(datajson)
        wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ

        evt = CloudEvent.wrap(
            event_id=str(uuid.uuid4()),
            event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
            source=self._source,
            data=wrapped,
        )
        body = evt.model_dump_json(exclude_none=True).encode("utf-8")
        await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
        return datajsonZ