Source code for aio_pika.robust_queue

import os
from base64 import b32encode
from collections import namedtuple
from logging import getLogger
from types import FunctionType

import aiormq

from .channel import Channel
from .exchange import ExchangeParamType
from .queue import ConsumerTag, Queue


log = getLogger(__name__)


DeclarationResult = namedtuple(
    "DeclarationResult", ("message_count", "consumer_count"),
)


[docs]class RobustQueue(Queue): __slots__ = ("_consumers", "_bindings") @staticmethod def _get_random_queue_name(): rb = os.urandom(16) return "amq_%s" % b32encode(rb).decode().replace("=", "").lower() def __init__( self, connection, channel: aiormq.Channel, name, durable, exclusive, auto_delete, arguments, passive: bool = False, ): super().__init__( connection=connection, channel=channel, name=name or self._get_random_queue_name(), durable=durable, exclusive=exclusive, auto_delete=auto_delete, arguments=arguments, passive=passive, ) self._consumers = {} self._bindings = {} async def restore(self, channel: Channel): self._channel = channel._channel await self.declare() for (exchange, routing_key), kwargs in self._bindings.items(): await self.bind(exchange, routing_key, **kwargs) for consumer_tag, kwargs in tuple(self._consumers.items()): await self.consume(consumer_tag=consumer_tag, **kwargs)
[docs] async def bind( self, exchange: ExchangeParamType, routing_key: str = None, *, arguments=None, timeout: int = None, robust: bool = True ): if routing_key is None: routing_key = self.name kwargs = dict(arguments=arguments, timeout=timeout) result = await super().bind( exchange=exchange, routing_key=routing_key, **kwargs ) if robust: self._bindings[(exchange, routing_key)] = kwargs return result
[docs] async def unbind( self, exchange: ExchangeParamType, routing_key: str = None, arguments: dict = None, timeout: int = None, ): if routing_key is None: routing_key = self.name result = await super().unbind( exchange, routing_key, arguments, timeout, ) self._bindings.pop((exchange, routing_key), None) return result
[docs] async def consume( self, callback: FunctionType, no_ack: bool = False, exclusive: bool = False, arguments: dict = None, consumer_tag=None, timeout=None, robust: bool = True, ) -> ConsumerTag: kwargs = dict( callback=callback, no_ack=no_ack, exclusive=exclusive, arguments=arguments, ) consumer_tag = await super().consume( consumer_tag=consumer_tag, **kwargs ) if robust: self._consumers[consumer_tag] = kwargs return consumer_tag
[docs] async def cancel( self, consumer_tag: ConsumerTag, timeout=None, nowait: bool = False ): result = await super().cancel(consumer_tag, timeout, nowait) self._consumers.pop(consumer_tag, None) return result
__all__ = ("RobustQueue",)