Source code for aio_pika.exchange

import asyncio
from enum import Enum, unique
from logging import getLogger
from typing import Optional

from pika.channel import Channel
from .common import BaseChannel, FutureStore
from .message import Message
from .tools import create_future

log = getLogger(__name__)


@unique
[docs]class ExchangeType(Enum): FANOUT = 'fanout' DIRECT = 'direct' TOPIC = 'topic' HEADERS = 'headers' X_DELAYED_MESSAGE = 'x-delayed-message'
[docs]class Exchange(BaseChannel): """ Exchange abstraction """ __slots__ = 'name', '__type', '__publish_method', 'arguments', 'durable', 'auto_delete', 'internal', '_channel' def __init__(self, channel: Channel, publish_method, name: str, type: ExchangeType=ExchangeType.DIRECT, *, auto_delete: Optional[bool], durable: Optional[bool], internal: Optional[bool], arguments: dict=None, loop: asyncio.AbstractEventLoop, future_store: FutureStore): super().__init__(loop, future_store) if not arguments: arguments = {} self._channel = channel self.__publish_method = publish_method self.__type = type.value self.name = name self.auto_delete = auto_delete self.durable = durable self.internal = internal self.arguments = arguments def __str__(self): return self.name def __repr__(self): return "<Exchange(%s): auto_delete=%s, durable=%s, arguments=%r)>" % ( self, self.auto_delete, self.durable, self.arguments ) @BaseChannel._ensure_channel_is_open def declare(self, timeout: int=None): future = self._create_future(timeout=timeout) self._channel.exchange_declare( future.set_result, self.name, self.__type, durable=self.durable, auto_delete=self.auto_delete, internal=self.internal, arguments=self.arguments, ) return future @BaseChannel._ensure_channel_is_open
[docs] def bind(self, exchange, routing_key: str='', *, arguments=None, timeout: int = None) -> asyncio.Future: """ A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange. Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we're going to call it a binding key. .. code-block:: python client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange(src_exchange_name, auto_delete=True) dest_exchange = await channel.declare_exchange(dest_exchange_name, auto_delete=True) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key) :param exchange: :class:`aio_pika.exchange.Exchange` instance :param routing_key: routing key :param arguments: additional arguments (will be passed to `pika`) :param timeout: execution timeout :return: :class:`None` """ log.debug( "Binding exchange %r to exchange %r, routing_key=%r, arguments=%r", self, exchange, routing_key, arguments ) f = self._create_future(timeout) self._channel.exchange_bind( f.set_result, self.name, exchange.name, routing_key=routing_key, arguments=arguments ) return f
@BaseChannel._ensure_channel_is_open
[docs] def unbind(self, exchange, routing_key: str = '', arguments: dict = None, timeout: int = None) -> asyncio.Future: """ Remove exchange-to-exchange binding for this :class:`Exchange` instance :param exchange: :class:`aio_pika.exchange.Exchange` instance :param routing_key: routing key :param arguments: additional arguments (will be passed to `pika`) :param timeout: execution timeout :return: :class:`None` """ log.debug( "Unbinding exchange %r from exchange %r, routing_key=%r, arguments=%r", self, exchange, routing_key, arguments ) f = self._create_future(timeout) self._channel.exchange_unbind( f.set_result, self.name, exchange.name, routing_key=routing_key, arguments=arguments ) return f
@BaseChannel._ensure_channel_is_open @asyncio.coroutine
[docs] def publish(self, message: Message, routing_key, *, mandatory=True, immediate=False): """ Publish the message to the queue. `aio_pika` use `publisher confirms`_ extension for message delivery. .. _publisher confirms: https://www.rabbitmq.com/confirms.html """ log.debug("Publishing message via exchange %s: %r", self, message) if self.internal: # Caught on the client side to prevent channel closure raise ValueError("cannot publish to internal exchange: '%s'!" % self.name) return ( yield from self.__publish_method( self.name, routing_key, message.body, properties=message.properties, mandatory=mandatory, immediate=immediate ) )
@BaseChannel._ensure_channel_is_open
[docs] def delete(self, if_unused=False) -> asyncio.Future: """ Delete the queue :param if_unused: perform deletion when queue has no bindings. """ log.info("Deleting %r", self) self._futures.reject_all(RuntimeError("Exchange was deleted")) future = create_future(loop=self.loop) self._channel.exchange_delete(future.set_result, self.name, if_unused=if_unused) return future
__all__ = ('Exchange',)