from logging import getLogger
from typing import Union
from warnings import warn
import aiormq
from .channel import Channel
from .exchange import Exchange, ExchangeType
from .queue import Queue
from .robust_exchange import RobustExchange
from .robust_queue import RobustQueue
from .types import TimeoutType
log = getLogger(__name__)
[docs]class RobustChannel(Channel):
""" Channel abstraction """
QUEUE_CLASS = RobustQueue
EXCHANGE_CLASS = RobustExchange
def __init__(
self,
connection,
channel_number: int = None,
publisher_confirms: bool = True,
on_return_raises=False,
):
"""
:param connection: :class:`aio_pika.adapter.AsyncioConnection` instance
:param loop:
Event loop (:func:`asyncio.get_event_loop()` when :class:`None`)
:param future_store: :class:`aio_pika.common.FutureStore` instance
:param publisher_confirms:
False if you don't need delivery confirmations
(in pursuit of performance)
"""
super().__init__(
connection=connection,
channel_number=channel_number,
publisher_confirms=publisher_confirms,
on_return_raises=on_return_raises,
)
self._exchanges = dict()
self._queues = dict()
self._prefetch_count = 0
self._prefetch_size = 0
self._global_ = False
async def reopen(self):
await super().reopen()
await self.restore()
async def restore(self):
await self.set_qos(
prefetch_count=self._prefetch_count,
prefetch_size=self._prefetch_size,
global_=self._global_,
)
for exchange in self._exchanges.values():
await exchange.restore(self)
for queue in self._queues.values():
await queue.restore(self)
@staticmethod
def _on_channel_close(sender, exc: Exception):
if exc:
log.exception(
"Robust channel %r has been closed.", sender, exc_info=exc,
)
else:
log.debug("Robust channel %r has been closed.", sender)
async def initialize(self, timeout: TimeoutType = None) -> None:
await super().initialize(timeout)
self.add_close_callback(self._on_channel_close)
async def set_qos(
self,
prefetch_count: int = 0,
prefetch_size: int = 0,
global_: bool = False,
timeout: TimeoutType = None,
all_channels: bool = None,
):
if all_channels is not None:
warn('Use "global_" instead of "all_channels"', DeprecationWarning)
global_ = all_channels
self._prefetch_count = prefetch_count
self._prefetch_size = prefetch_size
self._global_ = global_
return await super().set_qos(
prefetch_count=prefetch_count,
prefetch_size=prefetch_size,
global_=global_,
timeout=timeout,
)
[docs] async def declare_exchange(
self,
name: str,
type: Union[ExchangeType, str] = ExchangeType.DIRECT,
durable: bool = None,
auto_delete: bool = False,
internal: bool = False,
passive: bool = False,
arguments: dict = None,
timeout: TimeoutType = None,
robust: bool = True,
) -> Exchange:
exchange = await super().declare_exchange(
name=name,
type=type,
durable=durable,
auto_delete=auto_delete,
internal=internal,
passive=passive,
arguments=arguments,
timeout=timeout,
)
if not internal and robust:
self._exchanges[name] = exchange
return exchange
async def exchange_delete(
self,
exchange_name: str,
timeout: TimeoutType = None,
if_unused=False,
nowait=False,
) -> aiormq.spec.Exchange.DeleteOk:
result = await super().exchange_delete(
exchange_name=exchange_name,
timeout=timeout,
if_unused=if_unused,
nowait=nowait,
)
self._exchanges.pop(exchange_name, None)
return result
[docs] async def declare_queue(
self,
name: str = None,
*,
durable: bool = None,
exclusive: bool = False,
passive: bool = False,
auto_delete: bool = False,
arguments: dict = None,
timeout: TimeoutType = None,
robust: bool = True
) -> Queue:
queue = await super().declare_queue(
name=name,
durable=durable,
exclusive=exclusive,
passive=passive,
auto_delete=auto_delete,
arguments=arguments,
timeout=timeout,
)
if robust:
self._queues[name] = queue
return queue
async def queue_delete(
self,
queue_name: str,
timeout: TimeoutType = None,
if_unused: bool = False,
if_empty: bool = False,
nowait: bool = False,
):
result = await super().queue_delete(
queue_name=queue_name,
timeout=timeout,
if_unused=if_unused,
if_empty=if_empty,
nowait=nowait,
)
self._queues.pop(queue_name, None)
return result
__all__ = ("RobustChannel",)