Routing#

Warning

This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.

Note

Using the aio-pika async Python client

Note

Prerequisites

This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.

Where to get help

If you’re having trouble going through this tutorial you can contact us through the mailing list.

In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers.

In this tutorial we’re going to add a feature to it — we’re going to make it possible to subscribe only to a subset of the messages. For example, we will be able to direct only critical error messages to the log file (to save disk space), while still being able to print all of the log messages on the console.

Bindings#

In previous examples we were already creating bindings. You may recall code like:

async def main():
    ...

    # Binding the queue to the exchange
    await queue.bind(logs_exchange)

...

A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.

Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key. This is how we could create a binding with a key:

async def main():
    ...

    # Binding the queue to the exchange
    await queue.bind(logs_exchange,
                     routing_key="black")

...

The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.

Direct exchange#

Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity. For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.

We were using a fanout exchange, which doesn’t give us too much flexibility — it’s only capable of mindless broadcasting.

We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple — a message goes to the queues whose binding key exactly matches the routing key of the message.

To illustrate that, consider the following setup:

../_images/direct-exchange.svg

In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key black and the other one with green.

In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.

Multiple bindings#

../_images/direct-exchange-multiple.svg

It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.

Emitting logs#

We’ll use this model for our logging system. Instead of fanout we’ll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving script will be able to select the severity it wants to receive. Let’s focus on emitting logs first.

Like always we need to create an exchange first:

from aio_pika import ExchangeType

async def main():
    ...

    direct_logs_exchange = await channel.declare_exchange(
        'logs', ExchangeType.DIRECT
    )

And we’re ready to send a message:

async def main():
    ...

    await direct_logs_exchange.publish(
        Message(message_body),
        routing_key=severity,
    )

To simplify things we will assume that ‘severity’ can be one of ‘info’, ‘warning’, ‘error’.

Subscribing#

Receiving messages will work just like in the previous tutorial, with one exception - we’re going to create a new binding for each severity we’re interested in.

async def main():
    ...

    # Declaring queue
    queue = await channel.declare_queue(exclusive=True)

    # Binding the queue to the exchange
    await queue.bind(direct_logs_exchange,
                     routing_key=severity)

...

Putting it all together#

../_images/python-four.svg

The simplified code for receive_logs_direct_somple.py:

import asyncio
import sys

from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        severities = sys.argv[1:]

        if not severities:
            sys.stderr.write(
                f"Usage: {sys.argv[0]} [info] [warning] [error]\n",
            )
            sys.exit(1)

        # Declare an exchange
        direct_logs_exchange = await channel.declare_exchange(
            "logs", ExchangeType.DIRECT,
        )

        # Declaring random queue
        queue = await channel.declare_queue(durable=True)

        for severity in severities:
            await queue.bind(direct_logs_exchange, routing_key=severity)

        async with queue.iterator() as iterator:
            message: AbstractIncomingMessage
            async for message in iterator:
                async with message.process():
                    print(f" [x] {message.routing_key!r}:{message.body!r}")

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

The code for emit_log_direct.py:

import asyncio
import sys

from aio_pika import DeliveryMode, ExchangeType, Message, connect


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()

        logs_exchange = await channel.declare_exchange(
            "logs", ExchangeType.DIRECT,
        )

        message_body = b" ".join(
            arg.encode() for arg in sys.argv[2:]
        ) or b"Hello World!"

        message = Message(
            message_body,
            delivery_mode=DeliveryMode.PERSISTENT,
        )

        # Sending the message
        routing_key = sys.argv[1] if len(sys.argv) > 2 else "info"
        await logs_exchange.publish(message, routing_key=routing_key)

        print(f" [x] Sent {message.body!r}")


if __name__ == "__main__":
    asyncio.run(main())

Note

The callback-based code for receive_logs_direct.py:

import asyncio
import sys

from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage


async def on_message(message: AbstractIncomingMessage) -> None:
    async with message.process():
        print(" [x] %r:%r" % (message.routing_key, message.body))


async def main() -> None:
    # Perform connection
    connection = await connect("amqp://guest:guest@localhost/")

    async with connection:
        # Creating a channel
        channel = await connection.channel()
        await channel.set_qos(prefetch_count=1)

        severities = sys.argv[1:]

        if not severities:
            sys.stderr.write(
                "Usage: %s [info] [warning] [error]\n" % sys.argv[0],
            )
            sys.exit(1)

        # Declare an exchange
        direct_logs_exchange = await channel.declare_exchange(
            "logs", ExchangeType.DIRECT,
        )

        # Declaring random queue
        queue = await channel.declare_queue(durable=True)

        for severity in severities:
            await queue.bind(direct_logs_exchange, routing_key=severity)

        # Start listening the random queue
        await queue.consume(on_message)

        print(" [*] Waiting for messages. To exit press CTRL+C")
        await asyncio.Future()


if __name__ == "__main__":
    asyncio.run(main())

If you want to save only ‘warning’ and ‘error’ (and not ‘info’) log messages to a file, just open a console and type:

$ python receive_logs_direct_simple.py warning error > logs_from_rabbit.log

If you’d like to see all the log messages on your screen, open a new terminal and do:

$ python receive_logs_direct.py info warning error
 [*] Waiting for logs. To exit press CTRL+C

And, for example, to emit an error log message just type:

$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'

Move on to tutorial 5 to find out how to listen for messages based on a pattern.

Note

This material was adopted from official tutorial on rabbitmq.org.