File size: 2,518 Bytes
0bf22fe
bf292d9
2c8368f
0bf22fe
2c8368f
 
bbfbcdd
2c8368f
bbfbcdd
ba71442
32b704b
 
2c8368f
bbfbcdd
 
bf292d9
0bfda05
bf292d9
 
3692feb
 
 
 
 
 
 
 
 
 
 
 
 
bf292d9
0bf22fe
bbfbcdd
0bf22fe
2c8368f
 
 
 
0bf22fe
 
bbfbcdd
2c8368f
 
0bf22fe
 
 
 
 
 
 
bbfbcdd
0bf22fe
 
2c8368f
0bf22fe
bf292d9
2c8368f
 
 
 
0bf22fe
2c8368f
bbfbcdd
2c8368f
bf292d9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# rabbit_repo.py
import uuid
from typing import Any, Optional

import aio_pika

from config import settings
from models import CloudEvent
from rabbit_base import RabbitBase
from utils import to_json, json_compress_str
import logging
logger = logging.getLogger(__name__)

class RabbitRepo(RabbitBase):
    def __init__(self, external_source: str):
        super().__init__(exchange_type_resolver=self._resolve_type)
        self._source = external_source

    def _resolve_type(self, exch: str) -> str:
        # First check for oa.* exchanges
        if exch.lower().startswith("oa."):
            return "direct"  # Default for oa.* exchanges
    
        # Then check EXCHANGE_TYPES if present
        if hasattr(settings, 'EXCHANGE_TYPES') and settings.EXCHANGE_TYPES:
            matches = [k for k in settings.EXCHANGE_TYPES.keys() 
                      if exch.lower().startswith(k.lower())]
            if matches:
                return settings.EXCHANGE_TYPES[max(matches, key=len)]
    
        # Default fallback
        return "fanout"

    async def publish(self, exchange: str, obj: Any, routing_key: str = "") -> None:
        ex = await self.ensure_exchange(exchange)
        payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
        evt = CloudEvent.wrap(
            event_id=str(uuid.uuid4()),
            event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
            source=self._source,
            data=payload,
        )
        body = evt.model_dump_json(exclude_none=True).encode("utf-8")
        await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)

    async def publish_jsonz(
        self,
        exchange: str,
        obj: Any,
        routing_key: str = "",
        with_id: Optional[str] = None,
    ) -> str:
        ex = await self.ensure_exchange(exchange)
        payload = obj if not hasattr(obj, "model_dump") else obj.model_dump(by_alias=True)
        datajson = to_json(payload)
        datajsonZ = json_compress_str(datajson)
        wrapped: Any = (datajsonZ, with_id) if with_id else datajsonZ

        evt = CloudEvent.wrap(
            event_id=str(uuid.uuid4()),
            event_type=(obj.__class__.__name__ if obj is not None else "NullOrEmpty"),
            source=self._source,
            data=wrapped,
        )
        body = evt.model_dump_json(exclude_none=True).encode("utf-8")
        await ex.publish(aio_pika.Message(body=body), routing_key=routing_key)
        return datajsonZ