Patterns and helpers¶
Note
Available since aio-pika>=1.7.0
aio-pika includes some useful patterns for creating distributed systems.
Master/Worker¶
Helper which implements Master/Worker pattern. This applicable for balancing tasks between multiple workers.
The master creates tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master
async def main():
connection = await connect_robust("amqp://guest:guest@127.0.0.1/")
async with connection:
# Creating channel
channel = await connection.channel()
master = Master(channel)
# Creates tasks by proxy object
for task_id in range(1000):
await master.proxy.my_task_name(task_id=task_id)
# Or using create_task method
for task_id in range(1000):
await master.create_task(
"my_task_name", kwargs=dict(task_id=task_id)
)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Worker code:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import Master, RejectMessage, NackMessage
async def worker(*, task_id):
# If you want to reject message or send
# nack you might raise special exception
if task_id % 2 == 0:
raise RejectMessage(requeue=False)
if task_id % 2 == 1:
raise NackMessage(requeue=False)
print(task_id)
async def main():
connection = await connect_robust("amqp://guest:guest@127.0.0.1/")
# Creating channel
channel = await connection.channel()
master = Master(channel)
await master.create_worker("my_task_name", worker, auto_delete=True)
return connection
if __name__ == "__main__":
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main())
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
The one or multiple workers executes tasks.
RPC¶
Helper which implements Remote Procedure Call pattern. This applicable for balancing tasks between multiple workers.
The caller creates tasks and awaiting results:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def main():
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/",
client_properties={"connection_name": "caller"},
)
async with connection:
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
# Creates tasks by proxy object
for i in range(1000):
print(await rpc.proxy.multiply(x=100, y=i))
# Or using create_task method
for i in range(1000):
print(await rpc.call("multiply", kwargs=dict(x=100, y=i)))
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
One or multimple callees executing tasks:
import asyncio
from aio_pika import connect_robust
from aio_pika.patterns import RPC
async def multiply(*, x, y):
return x * y
async def main():
connection = await connect_robust(
"amqp://guest:guest@127.0.0.1/",
client_properties={"connection_name": "callee"},
)
# Creating channel
channel = await connection.channel()
rpc = await RPC.create(channel)
await rpc.register("multiply", multiply, auto_delete=True)
return connection
if __name__ == "__main__":
loop = asyncio.get_event_loop()
connection = loop.run_until_complete(main())
try:
loop.run_forever()
finally:
loop.run_until_complete(connection.close())
loop.shutdown_asyncgens()
Extending¶
Both patterns serialization behaviour might be changed by inheritance and
redefinition of methods aio_pika.patterns.base.serialize()
and aio_pika.patterns.base.deserialize()
.
Following examples demonstrates it:
import gzip
import json
from typing import Any
from aio_pika.patterns import RPC, Master
class JsonMaster(Master):
# deserializer will use SERIALIZER.loads(body)
SERIALIZER = json
CONTENT_TYPE = "application/json"
def serialize(self, data: Any) -> bytes:
return self.SERIALIZER.dumps(data, ensure_ascii=False)
class JsonRPC(RPC):
SERIALIZER = json
CONTENT_TYPE = "application/json"
def serialize(self, data: Any) -> bytes:
return self.SERIALIZER.dumps(data, ensure_ascii=False, default=repr)
def serialize_exception(self, exception: Exception) -> bytes:
return self.serialize(
{
"error": {
"type": exception.__class__.__name__,
"message": repr(exception),
"args": exception.args,
}
}
)
class JsonGZipRPC(JsonRPC):
CONTENT_TYPE = "application/octet-stream"
def serialize(self, data: Any) -> bytes:
return gzip.compress(super().serialize(data))
def deserialize(self, data: Any) -> bytes:
return super().deserialize(gzip.decompress(data))