File size: 2,236 Bytes
0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f bbfbcdd 2c8368f bbfbcdd ba71442 bf292d9 2c8368f bbfbcdd bf292d9 bbfbcdd bf292d9 0bf22fe bbfbcdd 0bf22fe 2c8368f 0bf22fe bbfbcdd 2c8368f 0bf22fe bbfbcdd 0bf22fe 2c8368f 0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f bbfbcdd 2c8368f bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
# rabbit_repo.py
import uuid
from typing import Any, Optional
import aio_pika
from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str
class RabbitRepo(RabbitBase):
def __init__(self, external_source: str):
super().__init__(exchange_type_resolver=self._resolve_type)
self._source = external_source # like SystemUrl.ExternalUrl
def _resolve_type(self, exch: str) -> str:
# longest prefix wins (like your .NET mapping)
matches = [k for k in settings.EXCHANGE_TYPES.keys() if exch.lower().startswith(k.lower())]
if matches:
return settings.EXCHANGE_TYPES[max(matches, key=len)]
return settings.RABBIT_EXCHANGE_TYPE
async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
ex = await self.ensure_exchange(exchange)
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=payload,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
async def publish_jsonz(
self,
exchange: str,
obj: Any,
routing_key: str = "",
with_id: Optional[str] = None,
) -> str:
ex = await self.ensure_exchange(exchange)
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
datajson = to_json(payload)
datajsonZ = json_compress_str(datajson)
wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=wrapped,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
return datajsonZ
|