File size: 2,518 Bytes
0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f bbfbcdd 2c8368f bbfbcdd ba71442 32b704b 2c8368f bbfbcdd bf292d9 0bfda05 bf292d9 3692feb bf292d9 0bf22fe bbfbcdd 0bf22fe 2c8368f 0bf22fe bbfbcdd 2c8368f 0bf22fe bbfbcdd 0bf22fe 2c8368f 0bf22fe bf292d9 2c8368f 0bf22fe 2c8368f bbfbcdd 2c8368f bf292d9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
# rabbit_repo.py
import uuid
from typing import Any, Optional
import aio_pika
from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str
import logging
logger = logging.getLogger(__name__)
class RabbitRepo(RabbitBase):
def __init__(self, external_source: str):
super().__init__(exchange_type_resolver=self._resolve_type)
self._source = external_source
def _resolve_type(self, exch: str) -> str:
# First check for oa.* exchanges
if exch.lower().startswith("oa."):
return "direct" # Default for oa.* exchanges
# Then check EXCHANGE_TYPES if present
if hasattr(settings, 'EXCHANGE_TYPES') and settings.EXCHANGE_TYPES:
matches = [k for k in settings.EXCHANGE_TYPES.keys()
if exch.lower().startswith(k.lower())]
if matches:
return settings.EXCHANGE_TYPES[max(matches, key=len)]
# Default fallback
return "fanout"
async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
ex = await self.ensure_exchange(exchange)
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=payload,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
async def publish_jsonz(
self,
exchange: str,
obj: Any,
routing_key: str = "",
with_id: Optional[str] = None,
) -> str:
ex = await self.ensure_exchange(exchange)
payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
datajson = to_json(payload)
datajsonZ = json_compress_str(datajson)
wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ
evt = CloudEvent.wrap(
event_id=str(uuid.uuid4()),
event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
source=self._source,
data=wrapped,
)
body = evt.model_dump_json(exclude_none=True).encode("utf-8")
await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
return datajsonZ
|