johnbridges commited on
Commit
a3a1b05
·
1 Parent(s): 527d73d
Files changed (1) hide show
  1. rabbit_base.py +27 -6
rabbit_base.py CHANGED
@@ -1,25 +1,42 @@
1
  # rabbit_base.py
2
  from typing import Callable, Dict, List, Optional
3
  import aio_pika
4
-
5
  from config import settings
6
 
7
  ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
8
 
9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
10
  class RabbitBase:
11
  def __init__(self, exchange_type_resolver: Optional[ExchangeResolver] = None):
12
  self._conn: Optional[aio_pika.RobustConnection] = None
13
  self._chan: Optional[aio_pika.RobustChannel] = None
14
  self._exchanges: Dict[str, aio_pika.Exchange] = {}
15
- self._exchange_type_resolver = (
16
- exchange_type_resolver or (lambda _: settings.RABBIT_EXCHANGE_TYPE)
17
  )
18
 
19
  async def connect(self) -> None:
20
  if self._conn and not self._conn.is_closed:
21
  return
22
- self._conn = await aio_pika.connect_robust(str(settings.AMQP_URL))
 
23
  self._chan = await self._conn.channel()
24
  await self._chan.set_qos(prefetch_count=settings.RABBIT_PREFETCH)
25
 
@@ -27,7 +44,7 @@ class RabbitBase:
27
  await self.connect()
28
  if name in self._exchanges:
29
  return self._exchanges[name]
30
- ex_type = self._exchange_type_resolver(name) # "fanout"|"topic"|"direct"|"headers"
31
  ex = await self._chan.declare_exchange(
32
  name, getattr(aio_pika.ExchangeType, ex_type), durable=True
33
  )
@@ -47,7 +64,11 @@ class RabbitBase:
47
  if ttl_ms:
48
  args["x-message-ttl"] = ttl_ms
49
  q = await self._chan.declare_queue(
50
- queue_name, durable=True, exclusive=False, auto_delete=True, arguments=args
 
 
 
 
51
  )
52
  for rk in routing_keys or [""]:
53
  await q.bind(ex, rk)
 
1
  # rabbit_base.py
2
  from typing import Callable, Dict, List, Optional
3
  import aio_pika
4
+ from urllib.parse import urlsplit, unquote
5
  from config import settings
6
 
7
  ExchangeResolver = Callable[[str], str] # exchangeName -> exchangeType
8
 
9
 
10
+ def _parse_amqp_url(url: str) -> dict:
11
+ """
12
+ Convert an AMQP URL into kwargs for aio_pika.connect_robust,
13
+ so we don't pass the raw URL (and leak the password in logs).
14
+ """
15
+ parts = urlsplit(url)
16
+ return {
17
+ "host": parts.hostname or "localhost",
18
+ "port": parts.port or (5671 if parts.scheme == "amqps" else 5672),
19
+ "login": parts.username or "guest",
20
+ "password": parts.password or "guest",
21
+ "virtualhost": unquote(parts.path[1:] or "/"),
22
+ "ssl": parts.scheme == "amqps",
23
+ }
24
+
25
+
26
  class RabbitBase:
27
  def __init__(self, exchange_type_resolver: Optional[ExchangeResolver] = None):
28
  self._conn: Optional[aio_pika.RobustConnection] = None
29
  self._chan: Optional[aio_pika.RobustChannel] = None
30
  self._exchanges: Dict[str, aio_pika.Exchange] = {}
31
+ self._exchange_type_resolver = exchange_type_resolver or (
32
+ lambda _: settings.RABBIT_EXCHANGE_TYPE
33
  )
34
 
35
  async def connect(self) -> None:
36
  if self._conn and not self._conn.is_closed:
37
  return
38
+ conn_kwargs = _parse_amqp_url(str(settings.AMQP_URL))
39
+ self._conn = await aio_pika.connect_robust(**conn_kwargs)
40
  self._chan = await self._conn.channel()
41
  await self._chan.set_qos(prefetch_count=settings.RABBIT_PREFETCH)
42
 
 
44
  await self.connect()
45
  if name in self._exchanges:
46
  return self._exchanges[name]
47
+ ex_type = self._exchange_type_resolver(name)
48
  ex = await self._chan.declare_exchange(
49
  name, getattr(aio_pika.ExchangeType, ex_type), durable=True
50
  )
 
64
  if ttl_ms:
65
  args["x-message-ttl"] = ttl_ms
66
  q = await self._chan.declare_queue(
67
+ queue_name,
68
+ durable=True,
69
+ exclusive=False,
70
+ auto_delete=True,
71
+ arguments=args,
72
  )
73
  for rk in routing_keys or [""]:
74
  await q.bind(ex, rk)