Quick start

Some useful examples.

Simple consumer

import asyncio
import aio_pika


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    queue_name = "test_queue"

    async with connection:
        # Creating channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=True)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

                    if queue.name in message.body.decode():
                        break


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Simple publisher

import asyncio
import aio_pika


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Asynchronous message processing

import asyncio
import aio_pika


async def process_message(message: aio_pika.IncomingMessage):
    async with message.process():
        print(message.body)
        await asyncio.sleep(1)


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    queue_name = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Maximum message count which will be
    # processing at the same time.
    await channel.set_qos(prefetch_count=100)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    await queue.consume(process_message)

    return connection


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    connection = loop.run_until_complete(main(loop))

    try:
        loop.run_forever()
    finally:
        loop.run_until_complete(connection.close())

Working with RabbitMQ transactions

import asyncio
import aio_pika


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    async with connection:
        routing_key = "test_queue"

        # Transactions conflicts with `publisher_confirms`
        channel = await connection.channel(publisher_confirms=False)

        # Use transactions with async context manager
        async with channel.transaction():
            # Publishing messages but delivery will not be done
            # before committing this transaction
            for i in range(10):
                message = aio_pika.Message(body="Hello #{}".format(i).encode())

                await channel.default_exchange.publish(
                    message, routing_key=routing_key
                )

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )

        await tx.commit()
        tx.close()

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Should be rejected".encode()),
            routing_key=routing_key,
        )

        await tx.rollback()
        tx.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Get single message example

import asyncio
from aio_pika import connect_robust, Message


async def main(loop):
    connection = await connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    queue_name = "test_queue"
    routing_key = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange("direct", auto_delete=True)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    # Binding queue
    await queue.bind(exchange, routing_key)

    await exchange.publish(
        Message(
            bytes("Hello", "utf-8"),
            content_type="text/plain",
            headers={"foo": "bar"},
        ),
        routing_key,
    )

    # Receiving message
    incoming_message = await queue.get(timeout=5)

    # Confirm message
    await incoming_message.ack()

    await queue.unbind(exchange, routing_key)
    await queue.delete()
    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

Tornado example

import asyncio
import tornado.ioloop
import tornado.web

from aio_pika import connect_robust, Message

tornado.ioloop.IOLoop.configure("tornado.platform.asyncio.AsyncIOLoop")
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)


QUEUE = asyncio.Queue()


class SubscriberHandler(tornado.web.RequestHandler):
    async def get(self):
        message = await QUEUE.get()
        self.finish(message.body)


class PublisherHandler(tornado.web.RequestHandler):
    async def post(self):
        connection = self.application.settings["amqp_connection"]
        channel = await connection.channel()

        try:
            await channel.default_exchange.publish(
                Message(body=self.request.body), routing_key="test",
            )
        finally:
            await channel.close()

        self.finish("OK")


async def make_app():
    amqp_connection = await connect_robust()

    channel = await amqp_connection.channel()
    queue = await channel.declare_queue("test", auto_delete=True)
    await queue.consume(QUEUE.put, no_ack=True)

    return tornado.web.Application(
        [(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
        amqp_connection=amqp_connection,
    )


if __name__ == "__main__":
    app = io_loop.asyncio_loop.run_until_complete(make_app())
    app.listen(8888)

    tornado.ioloop.IOLoop.current().start()

External credentials example

import asyncio
import aio_pika
import ssl


async def main(loop):
    connection = await aio_pika.connect_robust(
        host="127.0.0.1",
        login="",
        ssl=True,
        ssl_options=dict(
            ca_certs="cacert.pem",
            certfile="cert.pem",
            keyfile="key.pem",
            cert_reqs=ssl.CERT_REQUIRED,
        ),
        loop=loop,
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Connection pooling

import asyncio
import aio_pika
from aio_pika.pool import Pool


async def main():
    loop = asyncio.get_event_loop()

    async def get_connection():
        return await aio_pika.connect_robust("amqp://guest:guest@localhost/")

    connection_pool = Pool(get_connection, max_size=2, loop=loop)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool = Pool(get_channel, max_size=10, loop=loop)
    queue_name = "pool_queue"

    async def consume():
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.set_qos(10)

            queue = await channel.declare_queue(
                queue_name, durable=False, auto_delete=False
            )

            async with queue.iterator() as queue_iter:
                async for message in queue_iter:
                    print(message)
                    await message.ack()

    async def publish():
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.default_exchange.publish(
                aio_pika.Message(("Channel: %r" % channel).encode()),
                queue_name,
            )

    async with connection_pool, channel_pool:
        task = loop.create_task(consume())
        await asyncio.wait([publish() for _ in range(10000)])
        await task


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    loop.close()