Welcome to aio-pika’s documentation!#
aio-pika is a wrapper for the aiormq for asyncio and humans.
Features#
Completely asynchronous API.
Object oriented API.
Transparent auto-reconnects with complete state recovery with connect_robust (e.g. declared queues or exchanges, consuming state and bindings).
Python 3.6+ compatible.
For python 3.5 users available aio-pika<7
Transparent publisher confirms support
Transactions support
Completely type-hints coverage.
Installation#
Installation with pip:
pip install aio-pika
Installation from git:
# via pip
pip install https://github.com/mosquito/aio-pika/archive/master.zip
# manually
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
python setup.py install
Development#
Clone the project:
git clone https://github.com/mosquito/aio-pika.git
cd aio-pika
Create a new virtualenv for aio-pika:
virtualenv -p python3.5 env
Install all requirements for aio-pika:
env/bin/pip install -e '.[develop]'
Table Of Contents#
Quick start#
Some useful examples.
Simple consumer#
import asyncio
import logging
import aio_pika
async def main() -> None:
logging.basicConfig(level=logging.DEBUG)
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/",
)
queue_name = "test_queue"
async with connection:
# Creating channel
channel = await connection.channel()
# Will take no more than 10 messages in advance
await channel.set_qos(prefetch_count=10)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print(message.body)
if queue.name in message.body.decode():
break
if __name__ == "__main__":
asyncio.run(main())
Simple publisher#
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/",
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body=f"Hello {routing_key}".encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
asyncio.run(main())
Asynchronous message processing#
import asyncio
import aio_pika
async def process_message(
message: aio_pika.abc.AbstractIncomingMessage,
) -> None:
async with message.process():
print(message.body)
await asyncio.sleep(1)
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/",
)
queue_name = "test_queue"
# Creating channel
channel = await connection.channel()
# Maximum message count which will be processing at the same time.
await channel.set_qos(prefetch_count=100)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
await queue.consume(process_message)
try:
# Wait until terminate
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Working with RabbitMQ transactions#
import asyncio
import aio_pika
async def main() -> None:
connection = await aio_pika.connect_robust(
"amqp://guest:guest@127.0.0.1/",
)
async with connection:
routing_key = "test_queue"
# Transactions conflicts with `publisher_confirms`
channel = await connection.channel(publisher_confirms=False)
# Use transactions with async context manager
async with channel.transaction():
# Publishing messages but delivery will not be done
# before committing this transaction
for i in range(10):
message = aio_pika.Message(body="Hello #{}".format(i).encode())
await channel.default_exchange.publish(
message, routing_key=routing_key,
)
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
await tx.commit()
# Using transactions manually
tx = channel.transaction()
# start transaction manually
await tx.select()
await channel.default_exchange.publish(
aio_pika.Message(body="Should be rejected".encode()),
routing_key=routing_key,
)
await tx.rollback()
if __name__ == "__main__":
asyncio.run(main())
Get single message example#
import asyncio
from aio_pika import Message, connect_robust
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/?name=aio-pika%20example",
)
queue_name = "test_queue"
routing_key = "test_queue"
# Creating channel
channel = await connection.channel()
# Declaring exchange
exchange = await channel.declare_exchange("direct", auto_delete=True)
# Declaring queue
queue = await channel.declare_queue(queue_name, auto_delete=True)
# Binding queue
await queue.bind(exchange, routing_key)
await exchange.publish(
Message(
bytes("Hello", "utf-8"),
content_type="text/plain",
headers={"foo": "bar"},
),
routing_key,
)
# Receiving message
if incoming_message := await queue.get(timeout=5, fail=False):
# Confirm message
await incoming_message.ack()
else:
print("Queue empty")
await queue.unbind(exchange, routing_key)
await queue.delete()
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Set logging level#
Sometimes you want to see only your debug logs, but when you just call logging.basicConfig(logging.DEBUG) you set the debug log level for all loggers, includes all aio_pika’s modules. If you want to set logging level independently see following example:
import logging
from aio_pika import logger
logger.setLevel(logging.ERROR)
Tornado example#
import asyncio
import tornado.ioloop
import tornado.web
from aio_pika import Message, connect_robust
class Base:
QUEUE: asyncio.Queue
class SubscriberHandler(tornado.web.RequestHandler, Base):
async def get(self) -> None:
message = await self.QUEUE.get()
await self.finish(message.body)
class PublisherHandler(tornado.web.RequestHandler):
async def post(self) -> None:
connection = self.application.settings["amqp_connection"]
channel = await connection.channel()
try:
await channel.default_exchange.publish(
Message(body=self.request.body), routing_key="test",
)
finally:
await channel.close()
await self.finish("OK")
async def make_app() -> tornado.web.Application:
amqp_connection = await connect_robust()
channel = await amqp_connection.channel()
queue = await channel.declare_queue("test", auto_delete=True)
Base.QUEUE = asyncio.Queue()
await queue.consume(Base.QUEUE.put, no_ack=True)
return tornado.web.Application(
[(r"/publish", PublisherHandler), (r"/subscribe", SubscriberHandler)],
amqp_connection=amqp_connection,
)
async def main() -> None:
app = await make_app()
app.listen(8888)
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
External credentials example#
import asyncio
import ssl
import aio_pika
from aio_pika.abc import SSLOptions
async def main() -> None:
connection = await aio_pika.connect_robust(
host="127.0.0.1",
login="",
ssl=True,
ssl_options=SSLOptions(
cafile="cacert.pem",
certfile="cert.pem",
keyfile="key.pem",
no_verify_ssl=ssl.CERT_REQUIRED,
),
client_properties={"connection_name": "aio-pika external credentials"},
)
async with connection:
routing_key = "test_queue"
channel = await connection.channel()
await channel.default_exchange.publish(
aio_pika.Message(body="Hello {}".format(routing_key).encode()),
routing_key=routing_key,
)
if __name__ == "__main__":
asyncio.run(main())
Connection pooling#
import asyncio
import aio_pika
from aio_pika.abc import AbstractRobustConnection
from aio_pika.pool import Pool
async def main() -> None:
loop = asyncio.get_event_loop()
async def get_connection() -> AbstractRobustConnection:
return await aio_pika.connect_robust("amqp://guest:guest@localhost/")
connection_pool: Pool = Pool(get_connection, max_size=2, loop=loop)
async def get_channel() -> aio_pika.Channel:
async with connection_pool.acquire() as connection:
return await connection.channel()
channel_pool: Pool = Pool(get_channel, max_size=10, loop=loop)
queue_name = "pool_queue"
async def consume() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.set_qos(10)
queue = await channel.declare_queue(
queue_name, durable=False, auto_delete=False,
)
async with queue.iterator() as queue_iter:
async for message in queue_iter:
print(message)
await message.ack()
async def publish() -> None:
async with channel_pool.acquire() as channel: # type: aio_pika.Channel
await channel.default_exchange.publish(
aio_pika.Message(("Channel: %r" % channel).encode()),
queue_name,
)
async with connection_pool, channel_pool:
task = loop.create_task(consume())
await asyncio.wait([publish() for _ in range(50)])
await task
if __name__ == "__main__":
asyncio.run(main())
Patterns and helpers#
Note
Available since aio-pika>=1.7.0
aio-pika includes some useful patterns for creating distributed systems.
Master/Worker#
Helper which implements Master/Worker pattern. This applicable for balancing tasks between multiple workers.
The master creates tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/?name=aio-pika%20master",
)
async with connection:
# Creating channel
channel = await connection.channel()
master = Master(channel)
# Creates tasks by proxy object
for task_id in range(1000):
await master.proxy.my_task_name(task_id=task_id)
# Or using create_task method
for task_id in range(1000):
await master.create_task(
"my_task_name", kwargs=dict(task_id=task_id),
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Worker code:
import asyncio
from aio_pika import connect_robust
from aio_pika.abc import AbstractConnection
from aio_pika.patterns import Master, NackMessage, RejectMessage
async def worker(*, task_id: int) -> None:
# If you want to reject message or send
# nack you might raise special exception
if task_id % 2 == 0:
raise RejectMessage(requeue=False)
if task_id % 2 == 1:
raise NackMessage(requeue=False)
print(task_id)
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/?name=aio-pika%20worker",
)
# Creating channel
channel = await connection.channel()
# Initializing Master with channel
master = Master(channel)
await master.create_worker("my_task_name", worker, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
The one or multiple workers executes tasks.
RPC#
Helper which implements Remote Procedure Call pattern. This applicable for balancing tasks between multiple workers.
The caller creates tasks and awaiting results:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/",
client_properties={"connection_name": "caller"},
)
async with connection:
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
# Creates tasks by proxy object
for i in range(1000):
print(await rpc.proxy.multiply(x=100, y=i))
# Or using create_task method
for i in range(1000):
print(await rpc.call("multiply", kwargs=dict(x=100, y=i)))
if __name__ == "__main__":
asyncio.run(main())
One or multiple callees executing tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def multiply(*, x: int, y: int) -> int:
return x * y
async def main() -> None:
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/",
client_properties={"connection_name": "callee"},
)
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
await rpc.register("multiply", multiply, auto_delete=True)
try:
await asyncio.Future()
finally:
await connection.close()
if __name__ == "__main__":
asyncio.run(main())
Extending#
Both patterns serialization behaviour might be changed by inheritance and
redefinition of methods aio_pika.patterns.base.serialize()
and aio_pika.patterns.base.deserialize()
.
Following examples demonstrates it:
from typing import Any
import msgpack # type: ignore
from aio_pika.patterns import RPC, Master
class MsgpackRPC(RPC):
CONTENT_TYPE = "application/msgpack"
def serialize(self, data: Any) -> bytes:
return msgpack.dumps(data)
def deserialize(self, data: bytes) -> bytes:
return msgpack.loads(data)
class MsgpackMaster(Master):
CONTENT_TYPE = "application/msgpack"
def serialize(self, data: Any) -> bytes:
return msgpack.dumps(data)
def deserialize(self, data: bytes) -> bytes:
return msgpack.loads(data)
RabbitMQ tutorial#
Introduction#
Warning
This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.
It is expected that you are familiar with the basics of asyncio. Anyway following examples work as written. You feel free to download them and test it as is without any changes (in case your RabbitMQ installation allows access for user “guest”).
Otherwise we recommend to read asyncio tutorial.
Note
Prerequisites
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
Where to get help
If you’re having trouble going through this tutorial you can contact us through the mailing list.
RabbitMQ is a message broker. The principal idea is pretty simple: it accepts and forwards messages. You can think about it as a post office: when you send mail to the post box you’re pretty sure that Mr. Postman will eventually deliver the mail to your recipient. Using this metaphor RabbitMQ is a post box, a post office and a postman.
The major difference between RabbitMQ and the post office is the fact that it doesn’t deal with paper, instead it accepts, stores and forwards binary blobs of data ‒ messages.
RabbitMQ, and messaging in general, uses some jargon.
Producing means nothing more than sending. A program that sends messages is a producer.
We’ll draw it like that, with “P”:
A queue is the name for a mailbox. It lives inside RabbitMQ. Although messages flow through RabbitMQ and your applications, they can be stored only inside a queue. A queue is not bound by any limits, it can store as many messages as you like ‒ it’s essentially an infinite buffer. Many producers can send messages that go to one queue, many consumers can try to receive data from one queue.
A queue will be drawn as like that, with its name above it:
Consuming has a similar meaning to receiving. A consumer is a program that mostly waits to receive messages.
On our drawings it’s shown with “C”:
Note
Note that the producer, consumer, and broker do not have to reside on the same machine; indeed in most applications they don’t.
Hello World!#
Note
Using the aio-pika async Python client
Our “Hello world” won’t be too complex ‒ let’s send a message, receive it and print it on the screen. To do so we need two programs: one that sends a message and one that receives and prints it.
Our overall design will look like:
Producer sends messages to the “hello” queue. The consumer receives messages from that queue.
Note
RabbitMQ libraries
RabbitMQ speaks AMQP 0.9.1, which is an open, general-purpose protocol for messaging. There are a number of clients for RabbitMQ in many different languages. In this tutorial series we’re going to use aio-pika, which is the Python client recommended by the RabbitMQ team. To install it you can use the pip package management tool.
Sending#
Our first program send.py will send a single message to the queue. The first thing we need to do is to establish a connection with RabbitMQ server.
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
We’re connected now, to a broker on the local machine - hence the localhost. If we wanted to connect to a broker on a different machine we’d simply specify its name or IP address here.
Next, before sending we need to make sure the recipient queue exists. If we send a message to non-existing location, RabbitMQ will just trash the message. Let’s create a queue to which the message will be delivered, let’s name it hello:
# Declaring queue
queue = await channel.declare_queue("hello")
At that point we’re ready to send a message. Our first message will just contain a string Hello World! and we want to send it to our hello queue.
In RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. But let’s not get dragged down by the details ‒ you can read more about exchanges in the third part of this tutorial. All we need to know now is how to use a default exchange identified by an empty string. This exchange is special ‒ it allows us to specify exactly to which queue the message should go. The queue name needs to be specified in the routing_key parameter:
# Sending the message
await channel.default_exchange.publish(
Message(b"Hello World!"),
routing_key=queue.name,
)
Before exiting the program we need to make sure the network buffers were flushed and our message was actually delivered to RabbitMQ. We can do it by gently closing the connection. In this example async context manager has been used.
async with connection:
# Creating a channel
channel = await connection.channel()
Note
Sending doesn’t work!
If this is your first time using RabbitMQ and you don’t see the “Sent” message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 1Gb free) and is therefore refusing to accept messages. Check the broker logfile to confirm and reduce the limit if necessary. The configuration file documentation will show you how to set disk_free_limit.
Receiving#
Our second program receive.py will receive messages from the queue and print them on the screen.
Again, first we need to connect to RabbitMQ server. The code responsible for connecting to Rabbit is the same as previously.
The next step, just like before, is to make sure that the queue exists. Creating a queue using queue_declare is idempotent ‒ we can run the command as many times as we like, and only one will be created.
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("hello")
You may ask why we declare the queue again ‒ we have already declared it in our previous code. We could avoid that if we were sure that the queue already exists. For example if send.py program was run before. But we’re not yet sure which program to run first. In such cases it’s a good practice to repeat declaring the queue in both programs.
Note
Listing queues
You may wish to see what queues RabbitMQ has and how many messages are in them. You can do it (as a privileged user) using the rabbitmqctl tool:
$ sudo rabbitmqctl list_queues
Listing queues ...
hello 0
...done.
(omit sudo on Windows)
Receiving messages from the queue is simple. It works by subscribing a callback function to a queue or using simple get.
Whenever we receive a message, this callback function is called by the aio-pika library. In our case this function will print on the screen the contents of the message.
async def on_message(message: AbstractIncomingMessage) -> None:
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print(" [x] Received message %r" % message)
print("Message body is: %r" % message.body)
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
Next, we need to tell RabbitMQ that this particular callback function should receive messages from our hello queue:
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("hello")
# Start listening the queue with name 'hello'
await queue.consume(on_message, no_ack=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
The no_ack parameter will be described later on.
Putting it all together#
Full code for send.py
:
import asyncio
from aio_pika import Message, connect
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("hello")
# Sending the message
await channel.default_exchange.publish(
Message(b"Hello World!"),
routing_key=queue.name,
)
print(" [x] Sent 'Hello World!'")
if __name__ == "__main__":
asyncio.run(main())
Full receive.py
code:
import asyncio
from aio_pika import connect
from aio_pika.abc import AbstractIncomingMessage
async def on_message(message: AbstractIncomingMessage) -> None:
"""
on_message doesn't necessarily have to be defined as async.
Here it is to show that it's possible.
"""
print(" [x] Received message %r" % message)
print("Message body is: %r" % message.body)
print("Before sleep!")
await asyncio.sleep(5) # Represents async I/O operations
print("After sleep!")
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
# Declaring queue
queue = await channel.declare_queue("hello")
# Start listening the queue with name 'hello'
await queue.consume(on_message, no_ack=True)
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
Now we can try out our programs in a terminal. First, let’s send a message using our send.py program:
$ python send.py
[x] Sent 'Hello World!'
The producer program send.py will stop after every run. Let’s receive it:
$ python receive.py
[x] Received message IncomingMessage:{
"app_id": null,
"body_size": 12,
"cluster_id": null,
"consumer_tag": "ctag1.11fa33f5f4fa41f6a6488648181656e0",
"content_encoding": null,
"content_type": null,
"correlation_id": "b'None'",
"delivery_mode": 1,
"delivery_tag": 1,
"exchange": "",
"expiration": null,
"headers": null,
"message_id": null,
"priority": null,
"redelivered": false,
"reply_to": null,
"routing_key": "hello",
"synchronous": false,
"timestamp": null,
"type": "None",
"user_id": null
}
Message body is: b'Hello World!'
Hurray! We were able to send our first message through RabbitMQ. As you might have noticed, the receive.py program doesn’t exit. It will stay ready to receive further messages, and may be interrupted with Ctrl-C.
Try to run send.py again in a new terminal.
We’ve learned how to send and receive a message from a named queue. It’s time to move on to part 2 and build a simple work queue.
Note
This material was adopted from official tutorial on rabbitmq.org.
Work Queues#
Warning
This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.
This implementation is a part of official tutorial. Since version 1.7.0 aio-pika has patterns submodule.
You might use aio_pika.patterns.Master
for real projects.
Note
Using the aio-pika async Python client
Note
Prerequisites
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
Where to get help
If you’re having trouble going through this tutorial you can contact us through the mailing list.
In the first tutorial we wrote programs to send and receive messages from a named queue. In this one we’ll create a Work Queue that will be used to distribute time-consuming tasks among multiple workers.
The main idea behind Work Queues (aka: Task Queues) is to avoid doing a resource-intensive task immediately and having to wait for it to complete. Instead we schedule the task to be done later. We encapsulate a task as a message and send it to the queue. A worker process running in the background will pop the tasks and eventually execute the job. When you run many workers the tasks will be shared between them.
This concept is especially useful in web applications where it’s impossible to handle a complex task during a short HTTP request window.
Preparation#
In the previous part of this tutorial we sent a message containing “Hello World!”. Now we’ll be sending strings that stand for complex tasks. We don’t have a real-world task, like images to be resized or pdf files to be rendered, so let’s fake it by just pretending we’re busy - by using the time.sleep() function. We’ll take the number of dots in the string as its complexity; every dot will account for one second of “work”. For example, a fake task described by Hello… will take three seconds.
We will slightly modify the send.py code from our previous example, to allow arbitrary messages to be sent from the command line. This program will schedule tasks to our work queue, so let’s name it new_task.py:
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
message_body = b" ".join(
arg.encode() for arg in sys.argv[1:]
) or b"Hello World!"
message = Message(
message_body, delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await channel.default_exchange.publish(
message, routing_key="task_queue",
)
print(f" [x] Sent {message!r}")
Our old receive.py script also requires some changes: it needs to fake a second of work for every dot in the message body. It will pop messages from the queue and perform the task, so let’s call it worker.py:
async def on_message(message: AbstractIncomingMessage) -> None:
async with message.process():
print(f" [x] Received message {message!r}")
print(f" Message body is: {message.body!r}")
Round-robin dispatching#
One of the advantages of using a Task Queue is the ability to easily parallelise work. If we are building up a backlog of work, we can just add more workers and that way, scale easily.
First, let’s try to run two worker.py scripts at the same time. They will both get messages from the queue, but how exactly? Let’s see.
You need three consoles open. Two will run the worker.py script. These consoles will be our two consumers - C1 and C2.
shell1$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
shell2$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
In the third one we’ll publish new tasks. Once you’ve started the consumers you can publish a few messages:
shell3$ python new_task.py First message.
shell3$ python new_task.py Second message..
shell3$ python new_task.py Third message...
shell3$ python new_task.py Fourth message....
shell3$ python new_task.py Fifth message.....
Let’s see what is delivered to our workers:
shell1$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
shell2$ python worker.py
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'
By default, RabbitMQ will send each message to the next consumer, in sequence. On average every consumer will get the same number of messages. This way of distributing messages is called round-robin. Try this out with three or more workers.
Message acknowledgment#
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code once RabbitMQ delivers message to the customer it immediately removes it from memory. In this case, if you kill a worker we will lose the message it was just processing. We’ll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don’t want to lose any tasks. If a worker dies, we’d like the task to be delivered to another worker.
In order to make sure a message is never lost, RabbitMQ supports message acknowledgments. An ack(nowledgement) is sent back from the consumer to tell RabbitMQ that a particular message had been received, processed and that RabbitMQ is free to delete it.
If a consumer dies (its channel is closed, connection is closed, or TCP connection is lost) without sending an ack, RabbitMQ will understand that a message wasn’t processed fully and will re-queue it. If there are other consumers online at the same time, it will then quickly redeliver it to another consumer. That way you can be sure that no message is lost, even if the workers occasionally die.
There aren’t any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It’s fine even if processing a message takes a very, very long time.
Message acknowledgments are turned on by default. In previous examples we explicitly turned them off via the no_ack=True flag. It’s time to remove this flag and send a proper acknowledgment from the worker, once we’re done with a task.
async def on_message(message: IncomingMessage):
print(" [x] Received %r" % message.body)
await asyncio.sleep(message.body.count(b'.'), loop=loop)
print(" [x] Done")
await message.ack()
or using special context processor:
async with message.process():
print(f" [x] Received message {message!r}")
print(f" Message body is: {message.body!r}")
If context processor will catch an exception, the message will be returned to the queue.
Using this code we can be sure that even if you kill a worker using CTRL+C while it was processing a message, nothing will be lost. Soon after the worker dies all unacknowledged messages will be redelivered.
Note
Forgotten acknowledgment
It’s a common mistake to miss the ack. It’s an easy error, but the consequences are serious. Messages will be redelivered when your client quits (which may look like random redelivery), but RabbitMQ will eat more and more memory as it won’t be able to release any unacked messages.
In order to debug this kind of mistake you can use rabbitmqctl to print the messages_unacknowledged field:
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.
Message durability#
We have learned how to make sure that even if the consumer dies, the task isn’t lost. But our tasks will still be lost if RabbitMQ server stops.
When RabbitMQ quits or crashes it will forget the queues and messages unless you tell it not to. Two things are required to make sure that messages aren’t lost: we need to mark both the queue and messages as durable.
First, we need to make sure that RabbitMQ will never lose our queue. In order to do so, we need to declare it as durable:
queue = await channel.declare_queue(
"task_queue",
durable=True,
)
Although this command is correct by itself, it won’t work in our setup. That’s because we’ve already defined a queue called hello which is not durable. RabbitMQ doesn’t allow you to redefine an existing queue with different parameters and will return an error to any program that tries to do that. But there is a quick workaround - let’s declare a queue with different name, for example task_queue:
# Declaring queue
queue = await channel.declare_queue(
"task_queue",
durable=True,
)
This queue_declare change needs to be applied to both the producer and consumer code.
At that point we’re sure that the task_queue queue won’t be lost even if RabbitMQ restarts.
Now we need to mark our messages as persistent - by supplying a delivery_mode
property with a value PERSISTENT (see enum aio_pika.DeliveryMode
).
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
message_body = b" ".join(
arg.encode() for arg in sys.argv[1:]
) or b"Hello World!"
message = Message(
message_body, delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await channel.default_exchange.publish(
message, routing_key="task_queue",
)
print(f" [x] Sent {message!r}")
Note
Note on message persistence
Marking messages as persistent doesn’t fully guarantee that a message won’t be lost. Although it tells RabbitMQ to save the message to disk, there is still a short time window when RabbitMQ has accepted a message and hasn’t saved it yet. Also, RabbitMQ doesn’t do fsync(2) for every message – it may be just saved to cache and not really written to the disk. The persistence guarantees aren’t strong, but it’s more than enough for our simple task queue. If you need a stronger guarantee then you can use publisher confirms.
aio-pika supports publisher confirms out of the box.
Fair dispatch#
You might have noticed that the dispatching still doesn’t work exactly as we want. For example in a situation with two workers, when all odd messages are heavy and even messages are light, one worker will be constantly busy and the other one will do hardly any work. Well, RabbitMQ doesn’t know anything about that and will still dispatch messages evenly.
This happens because RabbitMQ just dispatches a message when the message enters the queue. It doesn’t look at the number of unacknowledged messages for a consumer. It just blindly dispatches every n-th message to the n-th consumer.
In order to defeat that we can use the basic.qos method with the prefetch_count=1 setting. This tells RabbitMQ not to give more than one message to a worker at a time. Or, in other words, don’t dispatch a new message to a worker until it has processed and acknowledged the previous one. Instead, it will dispatch it to the next worker that is not still busy.
async with connection:
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
Note
Note about queue size
If all the workers are busy, your queue can fill up. You will want to keep an eye on that, and maybe add more workers, or have some other strategy.
Putting it all together#
Final code of our new_task.py
script:
import asyncio
import sys
from aio_pika import DeliveryMode, Message, connect
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
message_body = b" ".join(
arg.encode() for arg in sys.argv[1:]
) or b"Hello World!"
message = Message(
message_body, delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await channel.default_exchange.publish(
message, routing_key="task_queue",
)
print(f" [x] Sent {message!r}")
if __name__ == "__main__":
asyncio.run(main())
And our worker.py
:
import asyncio
from aio_pika import connect
from aio_pika.abc import AbstractIncomingMessage
async def on_message(message: AbstractIncomingMessage) -> None:
async with message.process():
print(f" [x] Received message {message!r}")
print(f" Message body is: {message.body!r}")
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
# Declaring queue
queue = await channel.declare_queue(
"task_queue",
durable=True,
)
# Start listening the queue with name 'task_queue'
await queue.consume(on_message)
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
Using message acknowledgments and prefetch_count you can set up a work queue. The durability options let the tasks survive even if RabbitMQ is restarted.
Now we can move on to tutorial 3 and learn how to deliver the same message to many consumers.
Note
This material was adopted from official tutorial on rabbitmq.org.
Publish/Subscribe#
Warning
This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.
Note
Using the aio-pika async Python client
Note
Prerequisites
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
Where to get help
If you’re having trouble going through this tutorial you can contact us through the mailing list.
In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we’ll do something completely different — we’ll deliver a message to multiple consumers. This pattern is known as “publish/subscribe”.
To illustrate the pattern, we’re going to build a simple logging system. It will consist of two programs — the first will emit log messages and the second will receive and print them.
In our logging system every running copy of the receiver program will get the messages. That way we’ll be able to run one receiver and direct the logs to disk; and at the same time we’ll be able to run another receiver and see the logs on the screen.
Essentially, published log messages are going to be broadcast to all the receivers.
Exchanges#
In previous parts of the tutorial we sent and received messages to and from a queue. Now it’s time to introduce the full messaging model in Rabbit.
Let’s quickly go over what we covered in the previous tutorials:
A producer is a user application that sends messages.
A queue is a buffer that stores messages.
A consumer is a user application that receives messages.
The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn’t even know if a message will be delivered to any queue at all.
Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.
There are a few exchange types available: DIRECT, TOPIC, HEADERS and FANOUT
(see aio_pika.ExchangeType
).
We’ll focus on the last one — the fanout. Let’s create an exchange of that type, and call it logs:
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.FANOUT,
)
The fanout exchange is very simple. As you can probably guess from the name, it just broadcasts all the messages it receives to all the queues it knows. And that’s exactly what we need for our logger.
Note
Listing exchanges
To list the exchanges on the server you can run the ever useful rabbitmqctl:
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
logs fanout
amq.direct direct
amq.topic topic
amq.fanout fanout
amq.headers headers
...done.
In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you’ll need to use them at the moment.
Nameless exchange
In previous parts of the tutorial we knew nothing about exchanges, but still were able to send messages to queues. That was possible because we were using a default exchange, which we identify by the empty string (“”).
Recall how we published a message before:
await channel.default_exchange.publish(
Message(message_body),
routing_key='hello',
)
The exchange parameter is the name of the exchange. The empty string denotes the default or nameless exchange: messages are routed to the queue with the name specified by routing_key, if it exists.
Now, we can publish to our named exchange instead:
message_body = b" ".join(
arg.encode() for arg in sys.argv[1:]
) or b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await logs_exchange.publish(message, routing_key="info")
Temporary queues#
As you may remember previously we were using queues which had a specified name (remember hello and task_queue?). Being able to name a queue was crucial for us — we needed to point the workers to the same queue. Giving a queue a name is important when you want to share the queue between producers and consumers.
But that’s not the case for our logger. We want to hear about all log messages, not just a subset of them. We’re also interested only in currently flowing messages not in the old ones. To solve that we need two things.
Firstly, whenever we connect to Rabbit we need a fresh, empty queue. To do it we could create a queue with a random name, or, even better - let the server choose a random queue name for us. We can do this by not supplying the queue parameter to declare_queue:
queue = await channel.declare_queue()
Secondly, once we disconnect the consumer the queue should be deleted. There’s an exclusive flag for that:
queue = await channel.declare_queue(exclusive=True)
Bindings#
We’ve already created a fanout exchange and a queue. Now we need to tell the exchange to send messages to our queue. That relationship between exchange and a queue is called a binding.
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.FANOUT,
)
# Declaring queue
queue = await channel.declare_queue(exclusive=True)
# Binding the queue to the exchange
await queue.bind(logs_exchange)
From now on the logs exchange will append messages to our queue.
Note
Listing bindings
You can list existing bindings using, you guessed it, rabbitmqctl list_bindings.
Putting it all together#
The producer program, which emits log messages, doesn’t look much different from the previous tutorial.
The most important change is that we now want to publish messages to our logs exchange instead
of the nameless one. We need to supply a routing_key when sending, but its value is ignored
for fanout exchanges.
Here goes the code for emit_log.py
script:
import asyncio
import sys
from aio_pika import DeliveryMode, ExchangeType, Message, connect
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.FANOUT,
)
message_body = b" ".join(
arg.encode() for arg in sys.argv[1:]
) or b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await logs_exchange.publish(message, routing_key="info")
print(f" [x] Sent {message!r}")
if __name__ == "__main__":
asyncio.run(main())
As you see, after establishing the connection we declared the exchange. This step is necessary as publishing to a non-existing exchange is forbidden.
The messages will be lost if no queue is bound to the exchange yet, but that’s okay for us; if no consumer is listening yet we can safely discard the message.
The code for receive_logs.py
script:
import asyncio
from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage
async def on_message(message: AbstractIncomingMessage) -> None:
async with message.process():
print(f"[x] {message.body!r}")
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.FANOUT,
)
# Declaring queue
queue = await channel.declare_queue(exclusive=True)
# Binding the queue to the exchange
await queue.bind(logs_exchange)
# Start listening the queue
await queue.consume(on_message)
print(" [*] Waiting for logs. To exit press CTRL+C")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
We’re done. If you want to save logs to a file, just open a console and type:
$ python receive_logs.py > logs_from_rabbit.log
If you wish to see the logs on your screen, spawn a new terminal and run:
$ python receive_logs.py
And of course, to emit logs type:
$ python emit_log.py
Using rabbitmqctl list_bindings you can verify that the code actually creates bindings and queues as we want. With two receive_logs.py programs running you should see something like:
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.
The interpretation of the result is straightforward: data from exchange logs goes to two queues with server-assigned names. And that’s exactly what we intended.
To find out how to listen for a subset of messages, let’s move on to tutorial 4
Note
This material was adopted from official tutorial on rabbitmq.org.
Routing#
Warning
This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.
Note
Using the aio-pika async Python client
Note
Prerequisites
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
Where to get help
If you’re having trouble going through this tutorial you can contact us through the mailing list.
In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers.
In this tutorial we’re going to add a feature to it — we’re going to make it possible to subscribe only to a subset of the messages. For example, we will be able to direct only critical error messages to the log file (to save disk space), while still being able to print all of the log messages on the console.
Bindings#
In previous examples we were already creating bindings. You may recall code like:
async def main():
...
# Binding the queue to the exchange
await queue.bind(logs_exchange)
...
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key. This is how we could create a binding with a key:
async def main():
...
# Binding the queue to the exchange
await queue.bind(logs_exchange,
routing_key="black")
...
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
Direct exchange#
Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity. For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
We were using a fanout exchange, which doesn’t give us too much flexibility — it’s only capable of mindless broadcasting.
We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple — a message goes to the queues whose binding key exactly matches the routing key of the message.
To illustrate that, consider the following setup:
In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key black and the other one with green.
In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.
Multiple bindings#
It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.
Emitting logs#
We’ll use this model for our logging system. Instead of fanout we’ll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving script will be able to select the severity it wants to receive. Let’s focus on emitting logs first.
Like always we need to create an exchange first:
from aio_pika import ExchangeType
async def main():
...
direct_logs_exchange = await channel.declare_exchange(
'logs', ExchangeType.DIRECT
)
And we’re ready to send a message:
async def main():
...
await direct_logs_exchange.publish(
Message(message_body),
routing_key=severity,
)
To simplify things we will assume that ‘severity’ can be one of ‘info’, ‘warning’, ‘error’.
Subscribing#
Receiving messages will work just like in the previous tutorial, with one exception - we’re going to create a new binding for each severity we’re interested in.
async def main():
...
# Declaring queue
queue = await channel.declare_queue(exclusive=True)
# Binding the queue to the exchange
await queue.bind(direct_logs_exchange,
routing_key=severity)
...
Putting it all together#
The simplified code for receive_logs_direct_somple.py
:
import asyncio
import sys
from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
severities = sys.argv[1:]
if not severities:
sys.stderr.write(
f"Usage: {sys.argv[0]} [info] [warning] [error]\n",
)
sys.exit(1)
# Declare an exchange
direct_logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.DIRECT,
)
# Declaring random queue
queue = await channel.declare_queue(durable=True)
for severity in severities:
await queue.bind(direct_logs_exchange, routing_key=severity)
async with queue.iterator() as iterator:
message: AbstractIncomingMessage
async for message in iterator:
async with message.process():
print(f" [x] {message.routing_key!r}:{message.body!r}")
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
The code for emit_log_direct.py
:
import asyncio
import sys
from aio_pika import DeliveryMode, ExchangeType, Message, connect
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.DIRECT,
)
message_body = b" ".join(
arg.encode() for arg in sys.argv[2:]
) or b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
routing_key = sys.argv[1] if len(sys.argv) > 2 else "info"
await logs_exchange.publish(message, routing_key=routing_key)
print(f" [x] Sent {message.body!r}")
if __name__ == "__main__":
asyncio.run(main())
Note
The callback-based code for receive_logs_direct.py
:
import asyncio
import sys
from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage
async def on_message(message: AbstractIncomingMessage) -> None:
async with message.process():
print(" [x] %r:%r" % (message.routing_key, message.body))
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
async with connection:
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
severities = sys.argv[1:]
if not severities:
sys.stderr.write(
"Usage: %s [info] [warning] [error]\n" % sys.argv[0],
)
sys.exit(1)
# Declare an exchange
direct_logs_exchange = await channel.declare_exchange(
"logs", ExchangeType.DIRECT,
)
# Declaring random queue
queue = await channel.declare_queue(durable=True)
for severity in severities:
await queue.bind(direct_logs_exchange, routing_key=severity)
# Start listening the random queue
await queue.consume(on_message)
print(" [*] Waiting for messages. To exit press CTRL+C")
await asyncio.Future()
if __name__ == "__main__":
asyncio.run(main())
If you want to save only ‘warning’ and ‘error’ (and not ‘info’) log messages to a file, just open a console and type:
$ python receive_logs_direct_simple.py warning error > logs_from_rabbit.log
If you’d like to see all the log messages on your screen, open a new terminal and do:
$ python receive_logs_direct.py info warning error
[*] Waiting for logs. To exit press CTRL+C
And, for example, to emit an error log message just type:
$ python emit_log_direct.py error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'
Move on to tutorial 5 to find out how to listen for messages based on a pattern.
Note
This material was adopted from official tutorial on rabbitmq.org.
Topics#
Warning
This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.
Note
Using the aio-pika async Python client
Note
Prerequisites
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
Where to get help
If you’re having trouble going through this tutorial you can contact us through the mailing list.
In the previous tutorial we improved our logging system. Instead of using a fanout exchange only capable of dummy broadcasting, we used a direct one, and gained a possibility of selectively receiving the logs.
Although using the direct exchange improved our system, it still has limitations — it can’t do routing based on multiple criteria.
In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit…) and facility (auth/cron/kern…).
That would give us a lot of flexibility - we may want to listen to just critical errors coming from ‘cron’ but also all logs from ‘kern’.
To implement that in our logging system we need to learn about a more complex topic exchange.
Topic exchange#
Messages sent to a topic exchange can’t have an arbitrary routing_key - it must be a list of words, delimited by dots. The words can be anything, but usually they specify some features connected to the message. A few valid routing key examples: “stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”. There can be as many words in the routing key as you like, up to the limit of 255 bytes.
The binding key must also be in the same form. The logic behind the topic exchange is similar to a direct one - a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
It’s easiest to explain this in an example:
In this example, we’re going to send messages which all describe animals. The messages will be sent with a routing key that consists of three words (two dots). The first word in the routing key will describe a celerity, second a colour and third a species: “<celerity>.<colour>.<species>”.
We created three bindings: Q1 is bound with binding key “*.orange.*” and Q2 with “*.*.rabbit” and “lazy.#”.
These bindings can be summarised as:
Q1 is interested in all the orange animals.
Q2 wants to hear everything about rabbits, and everything about lazy animals.
A message with a routing key set to “quick.orange.rabbit” will be delivered to both queues. Message “lazy.orange.elephant” also will go to both of them. On the other hand “quick.orange.fox” will only go to the first queue, and “lazy.brown.fox” only to the second. “lazy.pink.rabbit” will be delivered to the second queue only once, even though it matches two bindings. “quick.brown.fox” doesn’t match any binding so it will be discarded.
What happens if we break our contract and send a message with one or four words, like “orange” or “quick.orange.male.rabbit”? Well, these messages won’t match any bindings and will be lost.
On the other hand “lazy.orange.male.rabbit”, even though it has four words, will match the last binding and will be delivered to the second queue.
Note
Topic exchange
Topic exchange is powerful and can behave like other exchanges.
When a queue is bound with “#” (hash) binding key - it will receive all the messages, regardless of the routing key - like in fanout exchange.
When special characters “*” (star) and “#” (hash) aren’t used in bindings, the topic exchange will behave just like a direct one.
Putting it all together#
We’re going to use a topic exchange in our logging system. We’ll start off with a working assumption that the routing keys of logs will have two words: “<facility>.<severity>”.
The code is almost the same as in the previous tutorial.
The code for emit_log_topic.py
:
import asyncio
import sys
from aio_pika import DeliveryMode, ExchangeType, Message, connect
async def main() -> None:
# Perform connection
connection = await connect(
"amqp://guest:guest@localhost/",
)
async with connection:
# Creating a channel
channel = await connection.channel()
topic_logs_exchange = await channel.declare_exchange(
"topic_logs", ExchangeType.TOPIC,
)
routing_key = sys.argv[1] if len(sys.argv) > 2 else "anonymous.info"
message_body = b" ".join(
arg.encode() for arg in sys.argv[2:]
) or b"Hello World!"
message = Message(
message_body,
delivery_mode=DeliveryMode.PERSISTENT,
)
# Sending the message
await topic_logs_exchange.publish(message, routing_key=routing_key)
print(f" [x] Sent {message!r}")
if __name__ == "__main__":
asyncio.run(main())
The code for receive_logs_topic.py
:
import asyncio
import sys
from aio_pika import ExchangeType, connect
from aio_pika.abc import AbstractIncomingMessage
async def main() -> None:
# Perform connection
connection = await connect("amqp://guest:guest@localhost/")
# Creating a channel
channel = await connection.channel()
await channel.set_qos(prefetch_count=1)
# Declare an exchange
topic_logs_exchange = await channel.declare_exchange(
"topic_logs", ExchangeType.TOPIC,
)
# Declaring queue
queue = await channel.declare_queue(
"task_queue", durable=True,
)
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
await queue.bind(topic_logs_exchange, routing_key=binding_key)
print(" [*] Waiting for messages. To exit press CTRL+C")
# Start listening the queue with name 'task_queue'
async with queue.iterator() as iterator:
message: AbstractIncomingMessage
async for message in iterator:
async with message.process():
print(f" [x] {message.routing_key!r}:{message.body!r}")
if __name__ == "__main__":
asyncio.run(main())
To receive all the logs run:
python receive_logs_topic.py "#"
To receive all logs from the facility “kern”:
python receive_logs_topic.py "kern.*"
Or if you want to hear only about “critical” logs:
python receive_logs_topic.py "*.critical"
You can create multiple bindings:
python receive_logs_topic.py "kern.*" "*.critical"
And to emit a log with a routing key “kern.critical” type:
python emit_log_topic.py "kern.critical" "A critical kernel error"
Have fun playing with these programs. Note that the code doesn’t make any assumption about the routing or binding keys, you may want to play with more than two routing key parameters.
Move on to tutorial 6 to learn about RPC.
Note
This material was adopted from official tutorial on rabbitmq.org.
Remote procedure call (RPC)#
Warning
This is a beta version of the port from official tutorial. Please when you found an error create issue or pull request for me.
This implementation is a part of official tutorial. Since version 1.7.0 aio-pika has patterns submodule.
You might use aio_pika.patterns.RPC
for real projects.
Note
Using the aio-pika async Python client
Note
Prerequisites
This tutorial assumes RabbitMQ is installed and running on localhost on standard port (5672). In case you use a different host, port or credentials, connections settings would require adjusting.
Where to get help
If you’re having trouble going through this tutorial you can contact us through the mailing list.
In the second tutorial we learned how to use Work Queues to distribute time-consuming tasks among multiple workers.
But what if we need to run a function on a remote computer and wait for the result? Well, that’s a different story. This pattern is commonly known as Remote Procedure Call or RPC.
In this tutorial we’re going to use RabbitMQ to build an RPC system: a client and a scalable RPC server. As we don’t have any time-consuming tasks that are worth distributing, we’re going to create a dummy RPC service that returns Fibonacci numbers.
Client interface#
To illustrate how an RPC service could be used we’re going to create a simple client class. It’s going to expose a method named call which sends an RPC request and blocks until the answer is received:
async def main():
fibonacci_rpc = FibonacciRpcClient()
result = await fibonacci_rpc.call(4)
print("fib(4) is %r" % result)
Note
A note on RPC
Although RPC is a pretty common pattern in computing, it’s often criticised. The problems arise when a programmer is not aware whether a function call is local or if it’s a slow RPC. Confusions like that result in an unpredictable system and adds unnecessary complexity to debugging. Instead of simplifying software, misused RPC can result in unmaintainable spaghetti code.
Bearing that in mind, consider the following advice:
Make sure it’s obvious which function call is local and which is remote.
Document your system. Make the dependencies between components clear.
Handle error cases. How should the client react when the RPC server is down for a long time?
When in doubt avoid RPC. If you can, you should use an asynchronous pipeline - instead of RPC-like blocking, results are asynchronously pushed to a next computation stage.
Callback queue#
In general doing RPC over RabbitMQ is easy. A client sends a request message and a server replies with a response message. In order to receive a response the client needs to send a ‘callback’ queue address with the request. Let’s try it:
async def main():
...
# Queue for results
callback_queue = await channel.declare_queue(exclusive=True)
await channel.default_exchange.publish(
Message(
request,
reply_to=callback_queue.name
),
routing_key='rpc_queue'
)
# ... and some code to read a response message from the callback_queue ...
...
Note
Message properties
The AMQP protocol predefines a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:
delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.
content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.
reply_to: Commonly used to name a callback queue.
correlation_id: Useful to correlate RPC responses with requests.
See additional info in aio_pika.Message
Correlation id#
In the method presented above we suggest creating a callback queue for every RPC request. That’s pretty inefficient, but fortunately there is a better way - let’s create a single callback queue per client.
That raises a new issue, having received a response in that queue it’s not clear to which request the response belongs. That’s when the correlation_id property is used. We’re going to set it to a unique value for every request. Later, when we receive a message in the callback queue we’ll look at this property, and based on that we’ll be able to match a response with a request. If we see an unknown correlation_id value, we may safely discard the message - it doesn’t belong to our requests.
You may ask, why should we ignore unknown messages in the callback queue, rather than failing with an error? It’s due to a possibility of a race condition on the server side. Although unlikely, it is possible that the RPC server will die just after sending us the answer, but before sending an acknowledgment message for the request. If that happens, the restarted RPC server will process the request again. That’s why on the client we must handle the duplicate responses gracefully, and the RPC should ideally be idempotent.
Summary#
Our RPC will work like this:
When the Client starts up, it creates an anonymous exclusive callback queue.
For an RPC request, the Client sends a message with two properties: reply_to, which is set to the callback queue and correlation_id, which is set to a unique value for every request.
The request is sent to an rpc_queue queue.
The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the reply_to field.
The client waits for data on the callback queue. When a message appears, it checks the correlation_id property. If it matches the value from the request it returns the response to the application.
Putting it all together#
The code for rpc_server.py
:
1import asyncio
2import logging
3
4from aio_pika import Message, connect
5from aio_pika.abc import AbstractIncomingMessage
6
7
8def fib(n: int) -> int:
9 if n == 0:
10 return 0
11 elif n == 1:
12 return 1
13 else:
14 return fib(n - 1) + fib(n - 2)
15
16
17async def main() -> None:
18 # Perform connection
19 connection = await connect("amqp://guest:guest@localhost/")
20
21 # Creating a channel
22 channel = await connection.channel()
23 exchange = channel.default_exchange
24
25 # Declaring queue
26 queue = await channel.declare_queue("rpc_queue")
27
28 print(" [x] Awaiting RPC requests")
29
30 # Start listening the queue with name 'hello'
31 async with queue.iterator() as qiterator:
32 message: AbstractIncomingMessage
33 async for message in qiterator:
34 try:
35 async with message.process(requeue=False):
36 assert message.reply_to is not None
37
38 n = int(message.body.decode())
39
40 print(f" [.] fib({n})")
41 response = str(fib(n)).encode()
42
43 await exchange.publish(
44 Message(
45 body=response,
46 correlation_id=message.correlation_id,
47 ),
48 routing_key=message.reply_to,
49 )
50 print("Request complete")
51 except Exception:
52 logging.exception("Processing error for message %r", message)
53
54if __name__ == "__main__":
55 asyncio.run(main())
The server code is rather straightforward:
As usual we start by establishing the connection and declaring the queue.
(6) We declare our fibonacci function. It assumes only valid positive integer input. (Don’t expect this one to work for big numbers, it’s probably the slowest recursive implementation possible).
(15) We declare a callback for basic_consume, the core of the RPC server. It’s executed when the request is received. It does the work and sends the response back.
The code for rpc_client.py
:
1import asyncio
2import uuid
3from typing import MutableMapping
4
5from aio_pika import Message, connect
6from aio_pika.abc import (
7 AbstractChannel, AbstractConnection, AbstractIncomingMessage, AbstractQueue,
8)
9
10
11class FibonacciRpcClient:
12 connection: AbstractConnection
13 channel: AbstractChannel
14 callback_queue: AbstractQueue
15 loop: asyncio.AbstractEventLoop
16
17 def __init__(self) -> None:
18 self.futures: MutableMapping[str, asyncio.Future] = {}
19 self.loop = asyncio.get_running_loop()
20
21 async def connect(self) -> "FibonacciRpcClient":
22 self.connection = await connect(
23 "amqp://guest:guest@localhost/", loop=self.loop,
24 )
25 self.channel = await self.connection.channel()
26 self.callback_queue = await self.channel.declare_queue(exclusive=True)
27 await self.callback_queue.consume(self.on_response)
28
29 return self
30
31 def on_response(self, message: AbstractIncomingMessage) -> None:
32 if message.correlation_id is None:
33 print(f"Bad message {message!r}")
34 return
35
36 future: asyncio.Future = self.futures.pop(message.correlation_id)
37 future.set_result(message.body)
38
39 async def call(self, n: int) -> int:
40 correlation_id = str(uuid.uuid4())
41 future = self.loop.create_future()
42
43 self.futures[correlation_id] = future
44
45 await self.channel.default_exchange.publish(
46 Message(
47 str(n).encode(),
48 content_type="text/plain",
49 correlation_id=correlation_id,
50 reply_to=self.callback_queue.name,
51 ),
52 routing_key="rpc_queue",
53 )
54
55 return int(await future)
56
57
58async def main() -> None:
59 fibonacci_rpc = await FibonacciRpcClient().connect()
60 print(" [x] Requesting fib(30)")
61 response = await fibonacci_rpc.call(30)
62 print(f" [.] Got {response!r}")
63
64
65if __name__ == "__main__":
66 asyncio.run(main())
The client code is slightly more involved:
We establish a connection, channel and declare an exclusive ‘callback’ queue for replies.
We subscribe to the ‘callback’ queue, so that we can receive RPC responses.
(26) The ‘on_response’ callback executed on every response is doing a very simple job, for every response message it checks if the correlation_id is the one we’re looking for. If so, it saves the response in self.response and breaks the consuming loop.
Next, we define our main call method - it does the actual RPC request.
(31) In this method, first we generate a unique correlation_id number and save it - the ‘on_response’ callback function will use this value to catch the appropriate response.
(36) Next, we publish the request message, with two properties: reply_to and correlation_id. And finally we return the response back to the user.
Our RPC service is now ready. We can start the server:
$ python rpc_server.py
[x] Awaiting RPC requests
To request a fibonacci number run the client:
$ python rpc_client.py
[x] Requesting fib(30)
The presented design is not the only possible implementation of a RPC service, but it has some important advantages:
If the RPC server is too slow, you can scale up by just running another one. Try running a second rpc_server.py in a new console. On the client side, the RPC requires sending and receiving only one message. No synchronous calls like queue_declare are required. As a result the RPC client needs only one network round trip for a single RPC request. Our code is still pretty simplistic and doesn’t try to solve more complex (but important) problems, like:
How should the client react if there are no servers running?
Should a client have some kind of timeout for the RPC?
If the server malfunctions and raises an exception, should it be forwarded to the client?
Protecting against invalid incoming messages (eg checking bounds) before processing.
Note
If you want to experiment, you may find the rabbitmq-management plugin useful for viewing the queues.
Note
This material was adopted from official tutorial on rabbitmq.org.
API Reference#
- aio_pika.AMQPException#
alias of
AMQPError
- class aio_pika.Channel(connection: AbstractConnection, channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False)[source]#
Channel abstraction
- Parameters
connection –
aio_pika.adapter.AsyncioConnection
instanceloop – Event loop (
asyncio.get_event_loop()
whenNone
)future_store –
aio_pika.common.FutureStore
instancepublisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
- async declare_exchange(name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, *, durable: bool = False, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) AbstractExchange [source]#
Declare an exchange.
- Parameters
name – string with exchange name or
aio_pika.exchange.Exchange
instancetype – Exchange type. Enum ExchangeType value or string. String values must be one of ‘fanout’, ‘direct’, ‘topic’, ‘headers’, ‘x-delayed-message’, ‘x-consistent-hash’.
durable – Durability (exchange survive broker restart)
auto_delete – Delete queue when channel will be closed.
internal – Do not send it to broker just create an object
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when exchange doesn’t exist.arguments – additional arguments
timeout – execution timeout
- Returns
aio_pika.exchange.Exchange
instance
- async declare_queue(name: Optional[str] = None, *, durable: bool = False, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) AbstractQueue [source]#
- Parameters
name – queue name
durable – Durability (queue survive broker restart)
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when queue doesn’t exist.auto_delete – Delete queue when channel will be closed.
arguments – additional arguments
timeout – execution timeout
- Returns
aio_pika.queue.Queue
instance- Raises
aio_pika.exceptions.ChannelClosed
instance
- async get_exchange(name: str, *, ensure: bool = True) AbstractExchange [source]#
With
ensure=True
, it’s a shortcut for.declare_exchange(..., passive=True)
; otherwise, it returns an exchange instance without checking its existence.When the exchange does not exist, if
ensure=True
, will raiseaio_pika.exceptions.ChannelClosed
.Use this method in a separate channel (or as soon as channel created). This is only a way to get an exchange without declaring a new one.
- Parameters
name – exchange name
ensure – ensure that the exchange exists
- Returns
aio_pika.exchange.Exchange
instance- Raises
aio_pika.exceptions.ChannelClosed
instance
- async get_queue(name: str, *, ensure: bool = True) AbstractQueue [source]#
With
ensure=True
, it’s a shortcut for.declare_queue(..., passive=True)
; otherwise, it returns a queue instance without checking its existence.When the queue does not exist, if
ensure=True
, will raiseaio_pika.exceptions.ChannelClosed
.Use this method in a separate channel (or as soon as channel created). This is only a way to get a queue without declaring a new one.
- Parameters
name – queue name
ensure – ensure that the queue exists
- Returns
aio_pika.queue.Queue
instance- Raises
aio_pika.exceptions.ChannelClosed
instance
- property is_closed: bool#
Returns True when the channel has been closed from the broker side or after the close() method has been called.
- property is_initialized: bool#
Returns True when the channel has been opened and ready for interaction
- class aio_pika.Connection(url: URL, loop: Optional[AbstractEventLoop] = None, ssl_context: Optional[SSLContext] = None, **kwargs: Any)[source]#
Connection abstraction
- channel(channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False) AbstractChannel [source]#
Coroutine which returns new instance of
Channel
.Example:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = await connection.channel( publisher_confirms=False ) await channel_no_confirms.close()
Also available as an asynchronous context manager:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed
- Parameters
channel_number – specify the channel number explicit
publisher_confirms – if True the
aio_pika.Exchange.publish()
method will be returnbool
after publish is complete. Otherwise theaio_pika.Exchange.publish()
method will be returnNone
on_return_raises – raise an
aio_pika.exceptions.DeliveryError
when mandatory message will be returned
- class aio_pika.Exchange(channel: AbstractChannel, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Optional[Dict[str, FieldValue]] = None)[source]#
Exchange abstraction
- async bind(exchange: Union[AbstractExchange, str], routing_key: str = '', *, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) BindOk [source]#
A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key)
- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns
None
- async delete(if_unused: bool = False, timeout: Optional[Union[float, int]] = None) DeleteOk [source]#
Delete the queue
- Parameters
timeout – operation timeout
if_unused – perform deletion when queue has no bindings.
- async publish(message: AbstractMessage, routing_key: str, *, mandatory: bool = True, immediate: bool = False, timeout: Optional[Union[float, int]] = None) Optional[Union[Ack, Nack, Reject]] [source]#
Publish the message to the queue. aio-pika uses publisher confirms extension for message delivery.
- async unbind(exchange: Union[AbstractExchange, str], routing_key: str = '', arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) UnbindOk [source]#
Remove exchange-to-exchange binding for this
Exchange
instance- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns
None
- class aio_pika.IncomingMessage(message: DeliveredMessage, no_ack: bool = False)[source]#
Incoming message is seems like Message but has additional methods for message acknowledgement.
Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit (“manual”) client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods:
basic.ack is used for positive acknowledgements
basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1)
basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack
Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn’t processed but still should be deleted.
Create an instance of
IncomingMessage
- async ack(multiple: bool = False) None [source]#
Send basic.ack is used for positive acknowledgements
Note
This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn’t require any responses from the broker.
- Parameters
multiple – If set to True, the message’s delivery tag is treated as “up to and including”, so that multiple messages can be acknowledged with a single method. If set to False, the ack refers to a single message.
- Returns
None
- process(requeue: bool = False, reject_on_redelivered: bool = False, ignore_processed: bool = False) AbstractProcessContext [source]#
Context manager for processing the message
>>> async def on_message_received(message: IncomingMessage): ... async with message.process(): ... # When exception will be raised ... # the message will be rejected ... print(message.body)
Example with ignore_processed=True
>>> async def on_message_received(message: IncomingMessage): ... async with message.process(ignore_processed=True): ... # Now (with ignore_processed=True) you may reject ... # (or ack) message manually too ... if True: # some reasonable condition here ... await message.reject() ... print(message.body)
- Parameters
requeue – Requeue message when exception.
reject_on_redelivered – When True message will be rejected only when message was redelivered.
ignore_processed – Do nothing if message already processed
- async reject(requeue: bool = False) None [source]#
When requeue=True the message will be returned to queue. Otherwise message will be dropped.
Note
This method looks like a blocking-method, but actually it just sends bytes to the socket and doesn’t require any responses from the broker.
- Parameters
requeue – bool
- class aio_pika.Message(body: bytes, *, headers: Optional[MutableMapping[str, Union[bool, bytes, bytearray, Decimal, List[Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], Dict[str, Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], float, int, None, str, datetime, Set[Union[bool, bytes, bytearray, Decimal, List[Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], Dict[str, Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], float, int, None, str, datetime]], Tuple[Union[bool, bytes, bytearray, Decimal, List[Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], Dict[str, Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], float, int, None, str, datetime], ...], FrozenSet[Union[bool, bytes, bytearray, Decimal, List[Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], Dict[str, Union[bool, bytes, bytearray, Decimal, List[FieldValue], Dict[str, FieldValue], float, int, None, str, datetime]], float, int, None, str, datetime]]]]] = None, content_type: Optional[str] = None, content_encoding: Optional[str] = None, delivery_mode: Optional[Union[DeliveryMode, int]] = None, priority: Optional[int] = None, correlation_id: Optional[str] = None, reply_to: Optional[str] = None, expiration: Optional[Union[int, datetime, float, timedelta]] = None, message_id: Optional[str] = None, timestamp: Optional[Union[int, datetime, float, timedelta]] = None, type: Optional[str] = None, user_id: Optional[str] = None, app_id: Optional[str] = None)[source]#
AMQP message abstraction
Creates a new instance of Message
- Parameters
body – message body
headers – message headers
content_type – content type
content_encoding – content encoding
delivery_mode – delivery mode
priority – priority
correlation_id – correlation id
reply_to – reply to
expiration – expiration in seconds (or datetime or timedelta)
message_id – message id
timestamp – timestamp
type – type
user_id – user id
app_id – app id
- info() dict [source]#
Create a dict with message attributes
{ "body_size": 100, "headers": {}, "content_type": "text/plain", "content_encoding": "", "delivery_mode": DeliveryMode.NOT_PERSISTENT, "priority": 0, "correlation_id": "", "reply_to": "", "expiration": "", "message_id": "", "timestamp": "", "type": "", "user_id": "", "app_id": "", }
- property locked: bool#
is message locked
- Returns
bool
- property properties: Properties#
Build
aiormq.spec.Basic.Properties
object
- class aio_pika.Queue(channel: AbstractChannel, name: Optional[str], durable: bool, exclusive: bool, auto_delete: bool, arguments: Optional[Dict[str, FieldValue]], passive: bool = False)[source]#
AMQP queue abstraction
- async bind(exchange: Union[AbstractExchange, str], routing_key: Optional[str] = None, *, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) BindOk [source]#
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises
asyncio.TimeoutError – when the binding timeout period has elapsed.
- Returns
None
- async cancel(consumer_tag: str, timeout: Optional[Union[float, int]] = None, nowait: bool = False) CancelOk [source]#
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.
- Parameters
consumer_tag – consumer tag returned by
consume()
timeout – execution timeout
nowait (bool) – Do not expect a Basic.CancelOk response
- Returns
Basic.CancelOk when operation completed successfully
- async consume(callback: Callable[[AbstractIncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, arguments: Optional[Dict[str, FieldValue]] = None, consumer_tag: Optional[str] = None, timeout: Optional[Union[float, int]] = None) str [source]#
Start to consuming the
Queue
.- Parameters
timeout –
asyncio.TimeoutError
will be raises when the Future was not finished after this time.callback – Consuming callback. Could be a coroutine.
no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
arguments – additional arguments
consumer_tag – optional consumer tag
- Raises
asyncio.TimeoutError – when the consuming timeout period has elapsed.
- Return str
consumer tag
str
- async declare(timeout: Optional[Union[float, int]] = None) DeclareOk [source]#
Declare queue.
- Parameters
timeout – execution timeout
passive – Only check to see if the queue exists.
- Returns
None
- async delete(*, if_unused: bool = True, if_empty: bool = True, timeout: Optional[Union[float, int]] = None) DeleteOk [source]#
Delete the queue.
- Parameters
if_unused – Perform delete only when unused
if_empty – Perform delete only when empty
timeout – execution timeout
- Returns
None
- async get(*, no_ack: bool = False, fail: bool = True, timeout: Optional[Union[float, int]] = 5) Optional[IncomingMessage] [source]#
Get message from the queue.
- Parameters
no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
timeout – execution timeout
fail – Should return
None
instead of raise an exceptionaio_pika.exceptions.QueueEmpty
.
- Returns
- iterator(**kwargs: Any) AbstractQueueIterator [source]#
Returns an iterator for async for expression.
Full example:
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async with queue.iterator() as q: async for message in q: print(message.body)
When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the “async for” expression directly.
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async for message in queue: print(message.body)
- Returns
QueueIterator
- async purge(no_wait: bool = False, timeout: Optional[Union[float, int]] = None) PurgeOk [source]#
Purge all messages from the queue.
- Parameters
no_wait – no wait response
timeout – execution timeout
- Returns
None
- async unbind(exchange: Union[AbstractExchange, str], routing_key: Optional[str] = None, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) UnbindOk [source]#
Remove binding from exchange for this
Queue
instance- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises
asyncio.TimeoutError – when the unbinding timeout period has elapsed.
- Returns
None
- class aio_pika.RobustChannel(connection: AbstractConnection, channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False)[source]#
Channel abstraction
- Parameters
connection –
aio_pika.adapter.AsyncioConnection
instanceloop – Event loop (
asyncio.get_event_loop()
whenNone
)future_store –
aio_pika.common.FutureStore
instancepublisher_confirms – False if you don’t need delivery confirmations (in pursuit of performance)
- EXCHANGE_CLASS#
alias of
RobustExchange
- QUEUE_CLASS#
alias of
RobustQueue
- async declare_exchange(name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, durable: bool = False, auto_delete: bool = False, internal: bool = False, passive: bool = False, arguments: Optional[Dict[str, Any]] = None, timeout: Optional[Union[float, int]] = None, robust: bool = True) AbstractRobustExchange [source]#
Declare an exchange.
- Parameters
name – string with exchange name or
aio_pika.exchange.Exchange
instancetype – Exchange type. Enum ExchangeType value or string. String values must be one of ‘fanout’, ‘direct’, ‘topic’, ‘headers’, ‘x-delayed-message’, ‘x-consistent-hash’.
durable – Durability (exchange survive broker restart)
auto_delete – Delete queue when channel will be closed.
internal – Do not send it to broker just create an object
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when exchange doesn’t exist.arguments – additional arguments
timeout – execution timeout
- Returns
aio_pika.exchange.Exchange
instance
- async declare_queue(name: Optional[str] = None, *, durable: bool = False, exclusive: bool = False, passive: bool = False, auto_delete: bool = False, arguments: Optional[Dict[str, Any]] = None, timeout: Optional[Union[float, int]] = None, robust: bool = True) AbstractRobustQueue [source]#
- Parameters
name – queue name
durable – Durability (queue survive broker restart)
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
passive – Do not fail when entity was declared previously but has another params. Raises
aio_pika.exceptions.ChannelClosed
when queue doesn’t exist.auto_delete – Delete queue when channel will be closed.
arguments – additional arguments
timeout – execution timeout
- Returns
aio_pika.queue.Queue
instance- Raises
aio_pika.exceptions.ChannelClosed
instance
- class aio_pika.RobustConnection(url: URL, loop: Optional[AbstractEventLoop] = None, **kwargs: Any)[source]#
Robust connection
- CHANNEL_CLASS#
alias of
RobustChannel
- channel(channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False) AbstractRobustChannel [source]#
Coroutine which returns new instance of
Channel
.Example:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) channel1 = connection.channel() await channel1.close() # Creates channel with specific channel number channel42 = connection.channel(42) await channel42.close() # For working with transactions channel_no_confirms = await connection.channel( publisher_confirms=False ) await channel_no_confirms.close()
Also available as an asynchronous context manager:
import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" ) async with connection.channel() as channel: # channel is open and available # channel is now closed
- Parameters
channel_number – specify the channel number explicit
publisher_confirms – if True the
aio_pika.Exchange.publish()
method will be returnbool
after publish is complete. Otherwise theaio_pika.Exchange.publish()
method will be returnNone
on_return_raises – raise an
aio_pika.exceptions.DeliveryError
when mandatory message will be returned
- class aio_pika.RobustExchange(channel: AbstractChannel, name: str, type: Union[ExchangeType, str] = ExchangeType.DIRECT, *, auto_delete: bool = False, durable: bool = False, internal: bool = False, passive: bool = False, arguments: Optional[Dict[str, FieldValue]] = None)[source]#
Exchange abstraction
- async bind(exchange: Union[AbstractExchange, str], routing_key: str = '', *, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None, robust: bool = True) BindOk [source]#
A binding can also be a relationship between two exchanges. This can be simply read as: this exchange is interested in messages from another exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
client = await connect() routing_key = 'simple_routing_key' src_exchange_name = "source_exchange" dest_exchange_name = "destination_exchange" channel = await client.channel() src_exchange = await channel.declare_exchange( src_exchange_name, auto_delete=True ) dest_exchange = await channel.declare_exchange( dest_exchange_name, auto_delete=True ) queue = await channel.declare_queue(auto_delete=True) await queue.bind(dest_exchange, routing_key) await dest_exchange.bind(src_exchange, routing_key)
- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns
None
- async unbind(exchange: Union[AbstractExchange, str], routing_key: str = '', arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) UnbindOk [source]#
Remove exchange-to-exchange binding for this
Exchange
instance- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Returns
None
- class aio_pika.RobustQueue(channel: AbstractChannel, name: Optional[str], durable: bool = False, exclusive: bool = False, auto_delete: bool = False, arguments: Optional[Dict[str, FieldValue]] = None, passive: bool = False)[source]#
- async bind(exchange: Union[AbstractExchange, str], routing_key: Optional[str] = None, *, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None, robust: bool = True) BindOk [source]#
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
Bindings can take an extra routing_key parameter. To avoid the confusion with a basic_publish parameter we’re going to call it a binding key.
- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises
asyncio.TimeoutError – when the binding timeout period has elapsed.
- Returns
None
- async cancel(consumer_tag: str, timeout: Optional[Union[float, int]] = None, nowait: bool = False) CancelOk [source]#
This method cancels a consumer. This does not affect already delivered messages, but it does mean the server will not send any more messages for that consumer. The client may receive an arbitrary number of messages in between sending the cancel method and receiving the cancel-ok reply. It may also be sent from the server to the client in the event of the consumer being unexpectedly cancelled (i.e. cancelled for any reason other than the server receiving the corresponding basic.cancel from the client). This allows clients to be notified of the loss of consumers due to events such as queue deletion.
- Parameters
consumer_tag – consumer tag returned by
consume()
timeout – execution timeout
nowait (bool) – Do not expect a Basic.CancelOk response
- Returns
Basic.CancelOk when operation completed successfully
- async consume(callback: Callable[[AbstractIncomingMessage], Any], no_ack: bool = False, exclusive: bool = False, arguments: Optional[Dict[str, FieldValue]] = None, consumer_tag: Optional[str] = None, timeout: Optional[Union[float, int]] = None, robust: bool = True) str [source]#
Start to consuming the
Queue
.- Parameters
timeout –
asyncio.TimeoutError
will be raises when the Future was not finished after this time.callback – Consuming callback. Could be a coroutine.
no_ack – if
True
you don’t need to callaio_pika.message.IncomingMessage.ack()
exclusive – Makes this queue exclusive. Exclusive queues may only be accessed by the current connection, and are deleted when that connection closes. Passive declaration of an exclusive queue by other connections are not allowed.
arguments – additional arguments
consumer_tag – optional consumer tag
- Raises
asyncio.TimeoutError – when the consuming timeout period has elapsed.
- Return str
consumer tag
str
- iterator(**kwargs: Any) AbstractQueueIterator [source]#
Returns an iterator for async for expression.
Full example:
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async with queue.iterator() as q: async for message in q: print(message.body)
When your program runs with run_forever the iterator will be closed in background. In this case the context processor for iterator might be skipped and the queue might be used in the “async for” expression directly.
import aio_pika async def main(): connection = await aio_pika.connect() async with connection: channel = await connection.channel() queue = await channel.declare_queue('test') async for message in queue: print(message.body)
- Returns
QueueIterator
- async unbind(exchange: Union[AbstractExchange, str], routing_key: Optional[str] = None, arguments: Optional[Dict[str, FieldValue]] = None, timeout: Optional[Union[float, int]] = None) UnbindOk [source]#
Remove binding from exchange for this
Queue
instance- Parameters
exchange –
aio_pika.exchange.Exchange
instancerouting_key – routing key
arguments – additional arguments
timeout – execution timeout
- Raises
asyncio.TimeoutError – when the unbinding timeout period has elapsed.
- Returns
None
- async aio_pika.connect(url: ~typing.Optional[~typing.Union[str, ~yarl.URL]] = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: ~typing.Optional[~asyncio.events.AbstractEventLoop] = None, ssl_options: ~typing.Optional[~aio_pika.abc.SSLOptions] = None, ssl_context: ~typing.Optional[~ssl.SSLContext] = None, timeout: ~typing.Optional[~typing.Union[float, int]] = None, client_properties: ~typing.Optional[~typing.Dict[str, FieldValue]] = None, connection_class: ~typing.Type[~aio_pika.abc.AbstractConnection] = <class 'aio_pika.connection.Connection'>, **kwargs: ~typing.Any) AbstractConnection [source]#
Make connection to the broker.
Example:
import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" )
Connect to localhost with default credentials:
import aio_pika async def main(): connection = await aio_pika.connect()
Note
- The available keys for ssl_options parameter are:
cert_reqs
certfile
keyfile
ssl_version
For an information on what the ssl_options can be set to reference the official Python documentation .
Set connection name for RabbitMQ admin panel:
# As URL parameter method read_connection = await connect( "amqp://guest:guest@localhost/?name=Read%20connection" ) write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } )
URL string might be contain ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem
- Parameters
client_properties – add custom client capability.
url – RFC3986 formatted broker address. When
None
will be used keyword arguments.host – hostname of the broker
port – broker port 5672 by default
login – username string. ‘guest’ by default.
password – password string. ‘guest’ by default.
virtualhost – virtualhost parameter. ‘/’ by default
ssl – use SSL for connection. Should be used with addition kwargs.
ssl_options – A dict of values for the SSL connection.
timeout – connection timeout in seconds
loop – Event loop (
asyncio.get_event_loop()
whenNone
)ssl_context – ssl.SSLContext instance
connection_class – Factory of a new connection
kwargs – addition parameters which will be passed to the connection.
- Returns
- async aio_pika.connect_robust(url: ~typing.Optional[~typing.Union[str, ~yarl.URL]] = None, *, host: str = 'localhost', port: int = 5672, login: str = 'guest', password: str = 'guest', virtualhost: str = '/', ssl: bool = False, loop: ~typing.Optional[~asyncio.events.AbstractEventLoop] = None, ssl_options: ~typing.Optional[~aio_pika.abc.SSLOptions] = None, ssl_context: ~typing.Optional[~ssl.SSLContext] = None, timeout: ~typing.Optional[~typing.Union[float, int]] = None, client_properties: ~typing.Optional[~typing.Dict[str, FieldValue]] = None, connection_class: ~typing.Type[~aio_pika.abc.AbstractRobustConnection] = <class 'aio_pika.robust_connection.RobustConnection'>, **kwargs: ~typing.Any) AbstractRobustConnection [source]#
Make connection to the broker.
Example:
import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:guest@127.0.0.1/" )
Connect to localhost with default credentials:
import aio_pika async def main(): connection = await aio_pika.connect()
Note
- The available keys for ssl_options parameter are:
cert_reqs
certfile
keyfile
ssl_version
For an information on what the ssl_options can be set to reference the official Python documentation .
Set connection name for RabbitMQ admin panel:
# As URL parameter method read_connection = await connect( "amqp://guest:guest@localhost/?name=Read%20connection" ) # keyword method write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } )
URL string might be contain ssl parameters e.g. amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem
- Parameters
client_properties – add custom client capability.
url – RFC3986 formatted broker address. When
None
will be used keyword arguments.host – hostname of the broker
port – broker port 5672 by default
login – username string. ‘guest’ by default.
password – password string. ‘guest’ by default.
virtualhost – virtualhost parameter. ‘/’ by default
ssl – use SSL for connection. Should be used with addition kwargs.
ssl_options – A dict of values for the SSL connection.
timeout – connection timeout in seconds
loop – Event loop (
asyncio.get_event_loop()
whenNone
)ssl_context – ssl.SSLContext instance
connection_class – Factory of a new connection
kwargs – addition parameters which will be passed to the connection.
- Returns
- aio_pika.patterns.base#
alias of <module ‘aio_pika.patterns.base’ from ‘/home/docs/checkouts/readthedocs.org/user_builds/aio-pika/envs/8.2.5/lib/python3.7/site-packages/aio_pika/patterns/base.py’>
- class aio_pika.patterns.Master(channel: AbstractChannel, requeue: bool = True, reject_on_redelivered: bool = False)[source]#
Implements Master/Worker pattern. Usage example:
worker.py
master = Master(channel) worker = await master.create_worker('test_worker', lambda x: print(x))
master.py
master = Master(channel) await master.proxy.test_worker('foo')
Creates a new
Master
instance.- Parameters
channel – Initialized instance of
aio_pika.Channel
- async create_task(channel_name: str, kwargs: Mapping[str, Any] = mappingproxy({}), **message_kwargs: Any) Optional[Union[Ack, Nack, Reject]] [source]#
Creates a new task for the worker
- async create_worker(channel_name: str, func: Callable[[...], Any], **kwargs: Any) Worker [source]#
Creates a new
Worker
instance.
- class aio_pika.patterns.Worker(queue: AbstractQueue, consumer_tag: str, loop: AbstractEventLoop)[source]#
- class aio_pika.patterns.RPC(channel: AbstractChannel)[source]#
Remote Procedure Call helper.
Create an instance
rpc = await RPC.create(channel)
Registering python function
# RPC instance passes only keyword arguments def multiply(*, x, y): return x * y await rpc.register("multiply", multiply)
Call function through proxy
assert await rpc.proxy.multiply(x=2, y=3) == 6
Call function explicit
assert await rpc.call('multiply', dict(x=2, y=3)) == 6
- async call(method_name: str, kwargs: Optional[Dict[str, Any]] = None, *, expiration: Optional[int] = None, priority: int = 5, delivery_mode: DeliveryMode = DeliveryMode.NOT_PERSISTENT) Any [source]#
Call remote method and awaiting result.
- Parameters
method_name – Name of method
kwargs – Methos kwargs
expiration – If not None messages which staying in queue longer will be returned and
asyncio.TimeoutError
will be raised.priority – Message priority
delivery_mode – Call message delivery mode
- Raises
asyncio.TimeoutError – when message expired
CancelledError – when called
RPC.cancel()
RuntimeError – internal error
- async classmethod create(channel: AbstractChannel, **kwargs: Any) RPC [source]#
Creates a new instance of
aio_pika.patterns.RPC
. You should use this method instead of__init__()
, becausecreate()
returns coroutine and makes async initialize- Parameters
channel – initialized instance of
aio_pika.Channel
- Returns
- async execute(func: Callable[[...], T], payload: Dict[str, Any]) T [source]#
Executes rpc call. Might be overlapped.
- async register(method_name: str, func: Callable[[...], T], **kwargs: Any) Any [source]#
Method creates a queue with name which equal of method_name argument. Then subscribes this queue.
- Parameters
method_name – Method name
func – target function. Function MUST accept only keyword arguments.
kwargs – arguments which will be passed to queue_declare
- Raises
RuntimeError – Function already registered in this
RPC
instance or method_name already used.
Thanks for contributing#
@mosquito (author)
@decaz (steel persuasiveness while code review)
@heckad (bug fixes)
@smagafurov (bug fixes)
@hellysmile (bug fixes and ideas)
@altvod (bug fixes)
@alternativehood (bugfixes)
@cprieto (bug fixes)
@akhoronko (bug fixes)
@iselind (bug fixes)
@DXist (bug fixes)
@blazewicz (bug fixes)
@chibby0ne (bug fixes)
@jmccarrell (bug fixes)
@taybin (bug fixes)
@ollamh (bug fixes)
@DriverX (bug fixes)
@brianmedigate (bug fixes)
@dan-stone (bug fixes)
@Kludex (bug fixes)
@bmario (bug fixes)
@tzoiker (bug fixes)
@Pehat (bug fixes)
@WindowGenerator (bug fixes)
@dhontecillas (bug fixes)
@tilsche (bug fixes)
@leenr (bug fixes)
@la0rg (bug fixes)
@SolovyovAlexander (bug fixes)
@kremius (bug fixes)
@zyp (bug fixes)
@kajetanj (bug fixes)
@Alviner (moral support, debug sessions and good mood)
@Pavkazzz (composure, and patience while debug sessions)
@bbrodriges (supplying grammar while writing documentation)
@dizballanze (review, grammar)
Versioning#
This software follows Semantic Versioning