Quick start

Some useful examples.

Simple consumer

import asyncio
import aio_pika


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    queue_name = "test_queue"

    async with connection:
        # Creating channel
        channel = await connection.channel()

        # Declaring queue
        queue = await channel.declare_queue(
            queue_name, auto_delete=True
        )

        async for message in queue:
            with message.process():
                print(message.body)

                if queue.name in message.body.decode():
                    break


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Simple publisher

import asyncio
import aio_pika


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop)

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(
                body='Hello {}'.format(routing_key).encode()
            ),
            routing_key=routing_key
        )


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Working with RabbitMQ transactions

import asyncio
import aio_pika


async def main(loop):
    connection = await aio_pika.connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop)

    async with connection:
        routing_key = "test_queue"

        # Transactions conflicts with `publisher_confirms`
        channel = await connection.channel(publisher_confirms=False)

        # Use transactions with async context manager
        async with channel.transaction():
            # Publishing messages but delivery will not be done
            # before committing this transaction
            for i in range(10):
                message = aio_pika.Message(
                    body='Hello #{}'.format(i).encode()
                )

                await channel.default_exchange.publish(
                    message, routing_key=routing_key
                )

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body='Hello {}'.format(routing_key).encode()),
            routing_key=routing_key
        )

        await tx.commit()
        tx.close()

        # Using transactions manually
        tx = channel.transaction()

        # start transaction manually
        await tx.select()

        await channel.default_exchange.publish(
            aio_pika.Message(body='Should be rejected'.encode()),
            routing_key=routing_key
        )

        await tx.rollback()
        tx.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()

Get single message example

import asyncio
from aio_pika import connect_robust, Message


async def main(loop):
    connection = await connect_robust(
        "amqp://guest:guest@127.0.0.1/", loop=loop
    )

    queue_name = "test_queue"
    routing_key = "test_queue"

    # Creating channel
    channel = await connection.channel()

    # Declaring exchange
    exchange = await channel.declare_exchange(
        'direct', auto_delete=True
    )

    # Declaring queue
    queue = await channel.declare_queue(
        queue_name, auto_delete=True
    )

    # Binding queue
    await queue.bind(exchange, routing_key)

    await exchange.publish(
        Message(
            bytes('Hello', 'utf-8'),
            content_type='text/plain',
            headers={'foo': 'bar'}
        ),
        routing_key
    )

    # Receiving message
    incoming_message = await queue.get(timeout=5)

    # Confirm message
    incoming_message.ack()

    await queue.unbind(exchange, routing_key)
    await queue.delete()
    await connection.close()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))

Tornado example

import asyncio
import tornado.ioloop
import tornado.web

from aio_pika import connect_robust, Message

tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop')
io_loop = tornado.ioloop.IOLoop.current()
asyncio.set_event_loop(io_loop.asyncio_loop)


QUEUE = asyncio.Queue()


class SubscriberHandler(tornado.web.RequestHandler):
    async def get(self):
        message = await QUEUE.get()
        self.finish(message.body)


class PublisherHandler(tornado.web.RequestHandler):
    async def post(self):
        connection = self.application.settings['amqp_connection']
        channel = await connection.channel()

        try:
            await channel.default_exchange.publish(
                Message(body=self.request.body),
                routing_key="test",
            )
        finally:
            await channel.close()

        self.finish("OK")


async def make_app():
    amqp_connection = await connect_robust()

    channel = await amqp_connection.channel()
    queue = await channel.declare_queue('test', auto_delete=True)
    await queue.consume(QUEUE.put, no_ack=True)

    return tornado.web.Application(
        [
            (r"/publish", PublisherHandler),
            (r"/subscribe", SubscriberHandler),
        ],
        amqp_connection=amqp_connection
    )


if __name__ == "__main__":
    app = io_loop.asyncio_loop.run_until_complete(make_app())
    app.listen(8888)

    tornado.ioloop.IOLoop.current().start()

External credentials example

import asyncio
import aio_pika
import ssl

async def main(loop):
    connection = await aio_pika.connect_robust(host='127.0.0.1',
        login='',
        ssl=True,
        ssl_options=dict(
            ca_certs="cacert.pem",
            certfile="cert.pem",
            keyfile="key.pem",
            cert_reqs=ssl.CERT_REQUIRED,
        ), loop=loop
    )

    async with connection:
        routing_key = "test_queue"

        channel = await connection.channel()

        await channel.default_exchange.publish(
            aio_pika.Message(
                body='Hello {}'.format(routing_key).encode()
            ),
            routing_key=routing_key
        )


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main(loop))
    loop.close()