Source code for aio_pika.robust_exchange

from logging import getLogger
from typing import Optional

import aiormq

from .channel import Channel
from .exchange import Exchange, ExchangeType


log = getLogger(__name__)


[docs]class RobustExchange(Exchange): """ Exchange abstraction """ def __init__( self, connection, channel: aiormq.Channel, name: str, type: ExchangeType = ExchangeType.DIRECT, *, auto_delete: Optional[bool], durable: Optional[bool], internal: Optional[bool], passive: Optional[bool], arguments: dict = None ): super().__init__( connection=connection, channel=channel, name=name, type=type, auto_delete=auto_delete, durable=durable, internal=internal, passive=passive, arguments=arguments, ) self._bindings = dict() async def restore(self, channel: Channel): self._channel = channel._channel if self.name == "": return await self.declare() for exchange, kwargs in self._bindings.items(): await self.bind(exchange, **kwargs)
[docs] async def bind( self, exchange, routing_key: str = "", *, arguments=None, timeout: int = None, robust: bool = True ): result = await super().bind( exchange, routing_key=routing_key, arguments=arguments, timeout=timeout, ) if robust: self._bindings[exchange] = dict( routing_key=routing_key, arguments=arguments, ) return result
[docs] async def unbind( self, exchange, routing_key: str = "", arguments: dict = None, timeout: int = None, ): result = await super().unbind( exchange, routing_key, arguments=arguments, timeout=timeout, ) self._bindings.pop(exchange, None) return result
__all__ = ("RobustExchange",)