aio_pika package

aio_pika.connect(url: str = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop=None, connection_class=<class 'aio_pika.connection.Connection'>, **kwargs) → typing.Generator[[typing.Any, NoneType], aio_pika.connection.Connection][source]

Make connection to the broker

Example:

import aio_pika

async def main():
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

Connect to localhost with default credentials:

import aio_pika

async def main():
    connection = await aio_pika.connect()
Parameters:
  • urlRFC3986 formatted broker address. When None will be used keyword arguments.
  • host – hostname of the broker
  • port – broker port 5672 by default
  • login – username string. ‘guest’ by default.
  • password – password string. ‘guest’ by default.
  • virtualhost – virtualhost parameter. ‘/’ by default
  • ssl – use SSL for connection. Should be used with addition kwargs. See pika documentation for more info.
  • loop – Event loop (asyncio.get_event_loop() when None)
  • connection_class – Factory of a new connection
  • kwargs – addition parameters which will be passed to the pika connection.
Returns:

aio_pika.connection.Connection

aio_pika.connect_robust(url: str = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop=None, connection_class=<class 'aio_pika.robust_connection.RobustConnection'>, **kwargs) → typing.Generator[[typing.Any, NoneType], aio_pika.connection.Connection][source]

Make robust connection to the broker.

That means that connection state will be restored after reconnect. After connection has been established the channels, the queues and the exchanges with their bindings will be restored.

Example:

import aio_pika

async def main():
    connection = await aio_pika.connect_robust("amqp://guest:guest@127.0.0.1/")

Connect to localhost with default credentials:

import aio_pika

async def main():
    connection = await aio_pika.connect_robust()
Parameters:
  • urlRFC3986 formatted broker address. When None will be used keyword arguments.
  • host – hostname of the broker
  • port – broker port 5672 by default
  • login – username string. ‘guest’ by default.
  • password – password string. ‘guest’ by default.
  • virtualhost – virtualhost parameter. ‘/’ by default
  • ssl – use SSL for connection. Should be used with addition kwargs. See pika documentation for more info.
  • loop – Event loop (asyncio.get_event_loop() when None)
  • connection_class – Factory of a new connection
  • kwargs – addition parameters which will be passed to the pika connection.
Returns:

aio_pika.connection.Connection

class aio_pika.Connection(host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtual_host: str = '/', ssl: bool = False, *, loop=None, **kwargs)[source]

Connection abstraction

CHANNEL_CLASS

alias of Channel

add_close_callback(callback: typing.Callable[[], NoneType])[source]

Add callback which will be called after connection will be closed.

asyncio.Future will be passed as a first argument.

Example:

import aio_pika

async def main():
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )
    connection.add_close_callback(print)
    await connection.close()
    # <Future finished result='Normal shutdown'>
Returns:None
channel(channel_number: int = None, publisher_confirms: bool = True) → typing.Generator[[typing.Any, NoneType], aio_pika.channel.Channel][source]

Coroutine which returns new instance of Channel.

Example:

import aio_pika

async def main(loop):
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )

    channel1 = connection.channel()
    await channel1.close()

    # Creates channel with specific channel number
    channel42 = connection.channel(42)
    await channel42.close()

    # For working with transactions
    channel_no_confirms = connection.channel(publisher_confirms=True)
    await channel_no_confirms.close()
Parameters:

bool after publish is complete. Otherwise the :method:`aio_pika.Exchange.publish` method will be return None

close() → asyncio.tasks.Task[source]

Close AMQP connection

closing

Return coroutine which will be finished after connection close.

Example:

import aio_pika

async def async_close(connection):
    await asyncio.sleep(2)
    await connection.close()

async def main(loop):
    connection = await aio_pika.connect(
        "amqp://guest:guest@127.0.0.1/"
    )
    loop.create_task(async_close(connection))

    await connection.closing
connect() → aio_pika.adapter.AsyncioConnection[source]

Perform connect. This method should be called after aio_pika.connection.Connection.__init__()

Note

This method calling in connect(). You shouldn’t call it explicit.

is_closed

Is this connection are closed

class aio_pika.Channel(connection, loop: asyncio.events.AbstractEventLoop, future_store: aio_pika.common.FutureStore, channel_number: int = None, publisher_confirms: bool = True)[source]

Channel abstraction

Parameters:
  • connectionaio_pika.adapter.AsyncioConnection instance
  • loop – Event loop (asyncio.get_event_loop() when None)
  • future_storeaio_pika.common.FutureStore instance
  • publisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
EXCHANGE_CLASS

alias of Exchange

QUEUE_CLASS

alias of Queue

closing

Return future which will be finished after channel close.

declare_queue(name: str = None, *, durable: bool = None, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: dict = None, timeout: int = None) → typing.Generator[[typing.Any, NoneType], aio_pika.queue.Queue][source]
Parameters:
  • name – queue name
  • durable – Durability (queue survive broker restart)
  • exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
  • passive – Only check to see if the queue exists.
  • auto_delete – Delete queue when channel will be closed.
  • arguments – pika additional arguments
  • timeout – execution timeout
Returns:

aio_pika.queue.Queue instance

class aio_pika.DeliveryMode[source]

An enumeration.

class aio_pika.Exchange(channel: pika.channel.Channel, publish_method, name: str, type: aio_pika.exchange.ExchangeType = <ExchangeType.DIRECT: 'direct'>, *, auto_delete: typing.Union[bool, NoneType], durable: typing.Union[bool, NoneType], internal: typing.Union[bool, NoneType], arguments: dict = None, loop: asyncio.events.AbstractEventLoop, future_store: aio_pika.common.FutureStore)[source]

Exchange abstraction

bind(exchange, routing_key: str = '', *, arguments=None, timeout: int = None) → asyncio.futures.Future[source]

A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.

client = await connect()

routing_key = 'simple_routing_key'
src_exchange_name = "source_exchange"
dest_exchange_name = "destination_exchange"

channel = await client.channel()
src_exchange = await channel.declare_exchange(src_exchange_name, auto_delete=True)
dest_exchange = await channel.declare_exchange(dest_exchange_name, auto_delete=True)
queue = await channel.declare_queue(auto_delete=True)

await queue.bind(dest_exchange, routing_key)
await dest_exchange.bind(src_exchange, routing_key)
Parameters:
  • exchangeaio_pika.exchange.Exchange instance
  • routing_key – routing key
  • arguments – additional arguments (will be passed to pika)
  • timeout – execution timeout
Returns:

None

delete(if_unused=False) → asyncio.futures.Future[source]

Delete the queue

Parameters:if_unused – perform deletion when queue has no bindings.
publish(message: aio_pika.message.Message, routing_key, *, mandatory=True, immediate=False)[source]

Publish the message to the queue. aio_pika use publisher confirms extension for message delivery.

unbind(exchange, routing_key: str = '', arguments: dict = None, timeout: int = None) → asyncio.futures.Future[source]

Remove exchange-to-exchange binding for this Exchange instance

Parameters:
  • exchangeaio_pika.exchange.Exchange instance
  • routing_key – routing key
  • arguments – additional arguments (will be passed to pika)
  • timeout – execution timeout
Returns:

None

class aio_pika.ExchangeType[source]

An enumeration.

class aio_pika.Queue(loop: asyncio.events.AbstractEventLoop, future_store: aio_pika.common.FutureStore, channel: aio_pika.adapter.Channel, name, durable, exclusive, auto_delete, arguments)[source]

AMQP queue abstraction

bind(exchange: aio_pika.exchange.Exchange, routing_key: str = None, *, arguments=None, timeout: int = None) → asyncio.futures.Future[source]

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.

Parameters:
  • exchangeaio_pika.exchange.Exchange instance
  • routing_key – routing key
  • arguments – additional arguments (will be passed to pika)
  • timeout – execution timeout
Returns:

None

cancel(consumer_tag: str, timeout=None, nowait: bool = False)[source]

This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.

Parameters:
  • consumer_tag – consumer tag returned by consume()
  • timeout – execution timeout
  • nowait (bool) – Do not expect a Basic.CancelOk response
Returns:

Basic.CancelOk when operation completed successfully

consume(callback: function, no_ack: bool = False, exclusive: bool = False, arguments: dict = None, consumer_tag=None, timeout=None) → typing.Generator[[typing.Any, NoneType], str][source]

Start to consuming the Queue.

Parameters:
  • callback – Consuming callback. Could be a coroutine.
  • no_ack – if True you don’t need to call aio_pika.message.IncomingMessage.ack()
  • exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
  • arguments – extended arguments for pika
  • consumer_tag – optional consumer tag
Returns:

consumer tag str

declare(timeout: int = None, passive: bool = False) → asyncio.futures.Future[source]

Declare queue.

Parameters:
  • timeout – execution timeout
  • passive – Only check to see if the queue exists.
Returns:

None

delete(*, if_unused=True, if_empty=True, timeout=None) → asyncio.futures.Future[source]

Delete the queue.

Parameters:
  • if_unused – Perform delete only when unused
  • if_empty – Perform delete only when empty
  • timeout – execution timeout
Returns:

None

get(*, no_ack=False, timeout=None, fail=True) → typing.Generator[[typing.Any, NoneType], typing.Union[aio_pika.message.IncomingMessage, NoneType]][source]

Get message from the queue.

Parameters:
  • no_ack – if True you don’t need to call aio_pika.message.IncomingMessage.ack()
  • timeout – execution timeout
  • fail – Should return None instead of raise an exception aio_pika.exceptions.QueueEmpty.
Returns:

aio_pika.message.IncomingMessage

iterator() → aio_pika.queue.QueueIterator[source]

Returns an iterator for async for expression.

Full example:

import aio_pika

async def main():
    connection = await aio_pika.connect()

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue('test')

        async with queue.iterator() as q:
            async for message in q:
                print(message.body)

When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the “async for” expression directly.

import aio_pika

async def main():
    connection = await aio_pika.connect()

    async with connection:
        channel = await connection.channel()

        queue = await channel.declare_queue('test')

        async for message in queue:
            print(message.body)
Returns:QueueIterator
purge(timeout=None) → asyncio.futures.Future[source]

Purge all messages from the queue.

Parameters:timeout – execution timeout
Returns:None
unbind(exchange: aio_pika.exchange.Exchange, routing_key: str, arguments: dict = None, timeout: int = None) → asyncio.futures.Future[source]

Remove binding from exchange for this Queue instance

Parameters:
  • exchangeaio_pika.exchange.Exchange instance
  • routing_key – routing key
  • arguments – additional arguments (will be passed to pika)
  • timeout – execution timeout
Returns:

None

class aio_pika.Message(body: bytes, *, headers: dict = None, content_type: str = None, content_encoding: str = None, delivery_mode: aio_pika.message.DeliveryMode = None, priority: int = None, correlation_id=None, reply_to: str = None, expiration: typing.Union[int, datetime.datetime, float, datetime.timedelta, NoneType] = None, message_id: str = None, timestamp: typing.Union[int, datetime.datetime, float, datetime.timedelta, NoneType] = None, type: str = None, user_id: str = None, app_id: str = None)[source]

AMQP message abstraction

Creates a new instance of Message

Parameters:
  • body – message body
  • headers – message headers
  • content_type – content type
  • content_encoding – content encoding
  • delivery_mode – delivery mode
  • priority – priority
  • correlation_id – correlation id
  • reply_to – reply to
  • expiration – expiration in seconds (or datetime or timedelta)
  • message_id – message id
  • timestamp – timestamp
  • type – type
  • user_id – user id
  • app_id – app id
info() → dict[source]

Create a dict with message attributes

{
    "body_size": 100,
    "headers": {},
    "content_type": "text/plain",
    "content_encoding": "",
    "delivery_mode": DeliveryMode.NOT_PERSISTENT,
    "priority": 0,
    "correlation_id": "",
    "reply_to": "",
    "expiration": "",
    "message_id": "",
    "timestamp": "",
    "type": "",
    "user_id": "",
    "app_id": "",
}
lock()[source]

Set lock flag to True

locked

is message locked

Returns:bool
properties

Build pika.BasicProperties object

class aio_pika.IncomingMessage(channel: pika.channel.Channel, envelope, properties, body, no_ack: bool = False)[source]

Incoming message it’s seems like Message but has additional methods for message acknowledgement.

Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit (“manual”) client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:

  • basic.ack is used for positive acknowledgements
  • basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)
  • basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack

Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn’t processed but still should be deleted.

Create an instance of IncomingMessage

Parameters:
  • channelaio_pika.channel.Channel
  • envelope – pika envelope
  • properties – properties
  • body – message body
  • no_ack – no ack needed
ack(multiple: bool = False)[source]

Send basic.ack is used for positive acknowledgements

Note

This method looks like a blocking-method, but actually it’s just send bytes to the socket and not required any responses from the broker.

Parameters:multiple – If set to True, the message’s delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the ack refers to a single message.
Returns:None
info() → dict[source]

Method returns dict representation of the message

process(requeue=False, reject_on_redelivered=False, ignore_processed=False)[source]

Context manager for processing the message

>>> def on_message_received(message: IncomingMessage):
...    with message.process():
...        # When exception will be raised
...        # the message will be rejected
...        print(message.body)

Example with ignore_processed=True

>>> def on_message_received(message: IncomingMessage):
...    with message.process(ignore_processed=True):
...        # Now (with ignore_processed=True) you may reject (or ack) message manually too
...        if True:  # some reasonable condition here
...            message.reject()
...        print(message.body)
Parameters:
  • requeue – Requeue message when exception.
  • reject_on_redelivered – When True message will be rejected only when message was redelivered.
  • ignore_processed – Do nothing if message already processed
reject(requeue=False)[source]

When requeue=True the message will be returned to queue. Otherwise message will be dropped.

Note

This method looks like a blocking-method, but actually it’s just send bytes to the socket and not required any responses from the broker.

Parameters:requeue – bool