Source code for aio_pika.robust_channel

from logging import getLogger
from typing import Union
from warnings import warn

import aiormq

from .channel import Channel
from .exchange import Exchange, ExchangeType
from .queue import Queue
from .robust_exchange import RobustExchange
from .robust_queue import RobustQueue
from .types import TimeoutType


log = getLogger(__name__)


[docs]class RobustChannel(Channel): """ Channel abstraction """ QUEUE_CLASS = RobustQueue EXCHANGE_CLASS = RobustExchange def __init__( self, connection, channel_number: int = None, publisher_confirms: bool = True, on_return_raises=False, ): """ :param connection: :class:`aio_pika.adapter.AsyncioConnection` instance :param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`) :param future_store: :class:`aio_pika.common.FutureStore` instance :param publisher_confirms: False if you don't need delivery confirmations (in pursuit of performance) """ super().__init__( connection=connection, channel_number=channel_number, publisher_confirms=publisher_confirms, on_return_raises=on_return_raises, ) self._exchanges = dict() self._queues = dict() self._prefetch_count = 0 self._prefetch_size = 0 self._global_ = False async def reopen(self): await super().reopen() await self.restore() async def restore(self): await self.set_qos( prefetch_count=self._prefetch_count, prefetch_size=self._prefetch_size, global_=self._global_, ) for exchange in self._exchanges.values(): await exchange.restore(self) for queue in self._queues.values(): await queue.restore(self) @staticmethod def _on_channel_close(sender, exc: Exception): if exc: log.exception( "Robust channel %r has been closed.", sender, exc_info=exc, ) else: log.debug("Robust channel %r has been closed.", sender) async def initialize(self, timeout: TimeoutType = None) -> None: await super().initialize(timeout) self.add_close_callback(self._on_channel_close) async def set_qos( self, prefetch_count: int = 0, prefetch_size: int = 0, global_: bool = False, timeout: TimeoutType = None, all_channels: bool = None, ): if all_channels is not None: warn('Use "global_" instead of "all_channels"', DeprecationWarning) global_ = all_channels self._prefetch_count = prefetch_count self._prefetch_size = prefetch_size self._global_ = global_ return await super().set_qos( prefetch_count=prefetch_count, prefetch_size=prefetch_size, global_=global_, timeout=timeout, )
[docs] async def declare_exchange( self, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, durable: bool = None, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: dict = None, timeout: TimeoutType = None, robust: bool = True, ) -> Exchange: exchange = await super().declare_exchange( name=name, type=type, durable=durable, auto_delete=auto_delete, internal=internal, passive=passive, arguments=arguments, timeout=timeout, ) if not internal and robust: self._exchanges[name] = exchange return exchange
async def exchange_delete( self, exchange_name: str, timeout: TimeoutType = None, if_unused=False, nowait=False, ) -> aiormq.spec.Exchange.DeleteOk: result = await super().exchange_delete( exchange_name=exchange_name, timeout=timeout, if_unused=if_unused, nowait=nowait, ) self._exchanges.pop(exchange_name, None) return result
[docs] async def declare_queue( self, name: str = None, *, durable: bool = None, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: dict = None, timeout: TimeoutType = None, robust: bool = True ) -> Queue: queue = await super().declare_queue( name=name, durable=durable, exclusive=exclusive, passive=passive, auto_delete=auto_delete, arguments=arguments, timeout=timeout, ) if robust: self._queues[name] = queue return queue
async def queue_delete( self, queue_name: str, timeout: TimeoutType = None, if_unused: bool = False, if_empty: bool = False, nowait: bool = False, ): result = await super().queue_delete( queue_name=queue_name, timeout=timeout, if_unused=if_unused, if_empty=if_empty, nowait=nowait, ) self._queues.pop(queue_name, None) return result
__all__ = ("RobustChannel",)