Quick start#

Some useful examples.

Simple consumer#

import asyncio
import logging

import aio_pika


async def main() -> None:
    logging.basicConfig(level=logging.DEBUG)
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/",
    )

    queue_name = "test_queue"

    async with connection:
        # Creating channel
        channel = await connection.channel()

        # Will take no more than 10 messages in advance
        await channel.set_qos(prefetch_count=10)

        # Declaring queue
        queue = await channel.declare_queue(queue_name, auto_delete=True)

        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print(message.body)

                    if queue.name in message.body.decode():
                        break


if __name__ == "__main__":
    asyncio.run(main())

Simple publisher#

import asyncio

import aio_pika


async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/",
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(body=f"Hello {routing_key}".encode()),
            routing_key=routing_key,
        )


if __name__ == "__main__":
    asyncio.run(main())

Asynchronous message processing#

import asyncio

import aio_pika


async def process_message(
    message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
    async with message.process():
        print(message.body)
        await asyncio.sleep(1)


async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/",
    )

    queue_name = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Maximum message count which will be processing at the same time.
    await channel.set_qos(prefetch_count=100)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    await queue.consume(process_message)

    try:
        # Wait until terminate
        await asyncio.Future()
    finally:
        await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

Working with RabbitMQ transactions#

import asyncio

import aio_pika


async def main() -> None:
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/",
    )

    async with connection:
        routing_key = "test_queue"

        # Transactions conflicts with `publisher_confirms`
        channel = await connection.channel(publisher_confirms=False)

        # Use transactions with async context manager
        async with channel.transaction():
            # Publishing messages but delivery will not be done
            # before committing this transaction
            for i in range(10):
                message = aio_pika.Message(body="Hello #{}".format(i).encode())

                await channel.default_exchange.publish(
                    message, routing_key=routing_key,
                )

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )

        await tx.commit()

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Should be rejected".encode()),
            routing_key=routing_key,
        )

        await tx.rollback()


if __name__ == "__main__":
    asyncio.run(main())

Get single message example#

import asyncio
from typing import Optional

from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractIncomingMessage


async def main() -> None:
    connection = await connect_robust(
        "amqp://guest:guest@127.0.0.1/?name=aio-pika%20example",
    )

    queue_name = "test_queue"
    routing_key = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange("direct", auto_delete=True)

    # Declaring queue
    queue = await channel.declare_queue(queue_name, auto_delete=True)

    # Binding queue
    await queue.bind(exchange, routing_key)

    await exchange.publish(
        Message(
            bytes("Hello", "utf-8"),
            content_type="text/plain",
            headers={"foo": "bar"},
        ),
        routing_key,
    )

    # Receiving one message
    incoming_message: Optional[AbstractIncomingMessage] = await queue.get(
        timeout=5, fail=False
    )
    if incoming_message:
        # Confirm message
        await incoming_message.ack()
    else:
        print("Queue empty")

    await queue.unbind(exchange, routing_key)
    await queue.delete()
    await connection.close()


if __name__ == "__main__":
    asyncio.run(main())

Set logging level#

Sometimes you want to see only your debug logs, but when you just call logging.basicConfig(logging.DEBUG) you set the debug log level for all loggers, includes all aio_pika’s modules. If you want to set logging level independently see following example:

import logging

from aio_pika import logger


logger.setLevel(logging.ERROR)

Tornado example#

import asyncio

import tornado.ioloop
import tornado.web

from aio_pika import Message, connect_robust


class Base:
    QUEUE: asyncio.Queue


class SubscriberHandler(tornado.web.RequestHandler, Base):
    async def get(self) -> None:
        message = await self.QUEUE.get()
        await self.finish(message.body)


class PublisherHandler(tornado.web.RequestHandler):
    async def post(self) -> None:
        connection = self.application.settings["amqp_connection"]
        channel = await connection.channel()

        try:
            await channel.default_exchange.publish(
                Message(body=self.request.body), routing_key="test",
            )
        finally:
            await channel.close()

        await self.finish("OK")


async def make_app() -> tornado.web.Application:
    amqp_connection = await connect_robust()

    channel = await amqp_connection.channel()
    queue = await channel.declare_queue("test", auto_delete=True)
    Base.QUEUE = asyncio.Queue()

    await queue.consume(Base.QUEUE.put, no_ack=True)

    return tornado.web.Application(
        [(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
        amqp_connection=amqp_connection,
    )


async def main() -> None:
    app = await make_app()
    app.listen(8888)
    await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

External credentials example#

import asyncio
import ssl

import aio_pika
from aio_pika.abc import SSLOptions


async def main() -> None:
    connection = await aio_pika.connect_robust(
        host="127.0.0.1",
        login="",
        ssl=True,
        ssl_options=SSLOptions(
            cafile="cacert.pem",
            certfile="cert.pem",
            keyfile="key.pem",
            no_verify_ssl=ssl.CERT_REQUIRED,
        ),
        client_properties={"connection_name": "aio-pika external credentials"},
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(body="Hello {}".format(routing_key).encode()),
            routing_key=routing_key,
        )


if __name__ == "__main__":
    asyncio.run(main())

Connection pooling#

import asyncio

import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool


async def main() -> None:
    loop = asyncio.get_event_loop()

    async def get_connection() -> AbstractRobustConnection:
        return await aio_pika.connect_robust("amqp://guest:guest@localhost/")

    connection_pool: Pool = Pool(get_connection, max_size=2, loop=loop)

    async def get_channel() -> aio_pika.Channel:
        async with connection_pool.acquire() as connection:
            return await connection.channel()

    channel_pool: Pool = Pool(get_channel, max_size=10, loop=loop)
    queue_name = "pool_queue"

    async def consume() -> None:
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.set_qos(10)

            queue = await channel.declare_queue(
                queue_name, durable=False, auto_delete=False,
            )

            async with queue.iterator() as queue_iter:
                async for message in queue_iter:
                    print(message)
                    await message.ack()

    async def publish() -> None:
        async with channel_pool.acquire() as channel:  # type: aio_pika.Channel
            await channel.default_exchange.publish(
                aio_pika.Message(("Channel: %r" % channel).encode()),
                queue_name,
            )

    async with connection_pool, channel_pool:
        task = loop.create_task(consume())
        await asyncio.wait([publish() for _ in range(50)])
        await task


if __name__ == "__main__":
    asyncio.run(main())