Source code for aio_pika.robust_exchange

import asyncio
import warnings
from typing import Any, Dict, Union

import aiormq
from pamqp.common import Arguments

from .abc import (
    AbstractChannel, AbstractExchange, AbstractRobustExchange,
    ExchangeParamType, TimeoutType,
)
from .exchange import Exchange, ExchangeType
from .log import get_logger


log = get_logger(__name__)


[docs]class RobustExchange(Exchange, AbstractRobustExchange): """ Exchange abstraction """ _bindings: Dict[Union[AbstractExchange, str], Dict[str, Any]] def __init__( self, channel: AbstractChannel, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Arguments = None, ): super().__init__( channel=channel, name=name, type=type, auto_delete=auto_delete, durable=durable, internal=internal, passive=passive, arguments=arguments, ) self._bindings = {} self.__restore_lock = asyncio.Lock() async def restore(self, channel: Any = None) -> None: if channel is not None: warnings.warn( "Channel argument will be ignored because you " "don't need to pass this anymore.", DeprecationWarning, ) async with self.__restore_lock: try: # special case for default exchange if self.name == "": return await self.declare() for exchange, kwargs in tuple(self._bindings.items()): await self.bind(exchange, **kwargs) except Exception: raise
[docs] async def bind( self, exchange: ExchangeParamType, routing_key: str = "", *, arguments: Arguments = None, timeout: TimeoutType = None, robust: bool = True, ) -> aiormq.spec.Exchange.BindOk: result = await super().bind( exchange, routing_key=routing_key, arguments=arguments, timeout=timeout, ) if robust: self._bindings[exchange] = dict( routing_key=routing_key, arguments=arguments, ) return result
[docs] async def unbind( self, exchange: ExchangeParamType, routing_key: str = "", arguments: Arguments = None, timeout: TimeoutType = None, ) -> aiormq.spec.Exchange.UnbindOk: result = await super().unbind( exchange, routing_key, arguments=arguments, timeout=timeout, ) self._bindings.pop(exchange, None) return result
__all__ = ("RobustExchange",)