Source code for aio_pika.robust_exchange

from logging import getLogger
from typing import Any, Dict, Union

import aiormq
from pamqp.common import Arguments

from .abc import (
    AbstractChannel, AbstractConnection, AbstractExchange,
    AbstractRobustChannel, AbstractRobustExchange, ExchangeParamType,
    TimeoutType,
)
from .exchange import Exchange, ExchangeType


log = getLogger(__name__)


[docs]class RobustExchange(Exchange, AbstractRobustExchange): """ Exchange abstraction """ _bindings: Dict[Union[AbstractExchange, str], Dict[str, Any]] def __init__( self, connection: AbstractConnection, channel: AbstractChannel, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Arguments = None ): super().__init__( connection=connection, channel=channel, name=name, type=type, auto_delete=auto_delete, durable=durable, internal=internal, passive=passive, arguments=arguments, ) self._bindings = dict() async def restore(self, channel: AbstractRobustChannel) -> None: self._channel = channel if self.name == "": return await self.declare() for exchange, kwargs in tuple(self._bindings.items()): await self.bind(exchange, **kwargs)
[docs] async def bind( self, exchange: ExchangeParamType, routing_key: str = "", *, arguments: Arguments = None, timeout: TimeoutType = None, robust: bool = True ) -> aiormq.spec.Exchange.BindOk: await self.connection.connected.wait() result = await super().bind( exchange, routing_key=routing_key, arguments=arguments, timeout=timeout, ) if robust: self._bindings[exchange] = dict( routing_key=routing_key, arguments=arguments, ) return result
[docs] async def unbind( self, exchange: ExchangeParamType, routing_key: str = "", arguments: Arguments = None, timeout: TimeoutType = None, ) -> aiormq.spec.Exchange.UnbindOk: await self.connection.connected.wait() result = await super().unbind( exchange, routing_key, arguments=arguments, timeout=timeout, ) self._bindings.pop(exchange, None) return result
__all__ = ("RobustExchange",)