Source code for aio_pika.message

from datetime import datetime, timedelta
from enum import IntEnum, unique
from functools import singledispatch
from logging import getLogger
from pprint import pformat
from typing import Union, Optional

from aio_pika.pika import BasicProperties
from aio_pika.pika.channel import Channel
from contextlib import contextmanager
from .exceptions import MessageProcessError


log = getLogger(__name__)
NoneType = type(None)


[docs]@unique class DeliveryMode(IntEnum): NOT_PERSISTENT = 1 PERSISTENT = 2
DateType = Union[int, datetime, float, timedelta, None] @singledispatch def convert_timestamp(value) -> Optional[int]: raise ValueError('Invalid timestamp type: %r' % type(value), value) @convert_timestamp.register(datetime) def _convert_datetime(value): now = datetime.now() return int((value - now).total_seconds()) @convert_timestamp.register(int) def _convert_int(value): return value @convert_timestamp.register(float) def _convert_numbers(value): return int(value) @convert_timestamp.register(timedelta) def _convert_timedelta(value): return int(value.total_seconds()) @convert_timestamp.register(type(None)) def _convert_none(_): return None
[docs]class Message: """ AMQP message abstraction """ __slots__ = ( "body", "headers", "content_type", "content_encoding", "body_size", "delivery_mode", "priority", "correlation_id", "reply_to", "expiration", "message_id", "timestamp", "type", "user_id", "app_id", "__lock" ) def __init__(self, body: bytes, *, headers: dict=None, content_type: str=None, content_encoding: str=None, delivery_mode: DeliveryMode=None, priority: int=None, correlation_id=None, reply_to: str=None, expiration: DateType=None, message_id: str=None, timestamp: DateType=None, type: str=None, user_id: str=None, app_id: str=None): """ Creates a new instance of Message :param body: message body :param headers: message headers :param content_type: content type :param content_encoding: content encoding :param delivery_mode: delivery mode :param priority: priority :param correlation_id: correlation id :param reply_to: reply to :param expiration: expiration in seconds (or datetime or timedelta) :param message_id: message id :param timestamp: timestamp :param type: type :param user_id: user id :param app_id: app id """ self.__lock = False self.body = body if isinstance(body, bytes) else bytes(body) self.body_size = len(self.body) if self.body else 0 self.headers = headers self.content_type = content_type self.content_encoding = content_encoding self.delivery_mode = DeliveryMode( int(delivery_mode or DeliveryMode.NOT_PERSISTENT) ).value self.priority = priority self.correlation_id = self._as_bytes(correlation_id) self.reply_to = reply_to self.expiration = expiration self.message_id = message_id self.timestamp = convert_timestamp(timestamp) self.type = type self.user_id = str(user_id) if user_id else None self.app_id = str(app_id) if app_id else None @staticmethod def _as_bytes(value): if isinstance(value, bytes): return value elif isinstance(value, str): return value.encode() elif isinstance(value, NoneType): return b'' else: return str(value).encode()
[docs] def info(self) -> dict: """ Create a dict with message attributes :: { "body_size": 100, "headers": {}, "content_type": "text/plain", "content_encoding": "", "delivery_mode": DeliveryMode.NOT_PERSISTENT, "priority": 0, "correlation_id": "", "reply_to": "", "expiration": "", "message_id": "", "timestamp": "", "type": "", "user_id": "", "app_id": "", } """ return { "body_size": self.body_size, "headers": self.headers, "content_type": self.content_type, "content_encoding": self.content_encoding, "delivery_mode": self.delivery_mode, "priority": self.priority, "correlation_id": self.correlation_id, "reply_to": self.reply_to, "expiration": self.expiration, "message_id": self.message_id, "timestamp": self.timestamp, "type": str(self.type), "user_id": self.user_id, "app_id": self.app_id, }
@property def locked(self) -> bool: """ is message locked :return: :class:`bool` """ return bool(self.__lock) @property def properties(self) -> BasicProperties: """ Build :class:`pika.BasicProperties` object """ return BasicProperties( content_type=self.content_type, content_encoding=self.content_encoding, headers=self.headers, delivery_mode=self.delivery_mode, priority=self.priority, correlation_id=self.correlation_id, reply_to=self.reply_to, expiration=str( convert_timestamp(self.expiration * 1000) ) if self.expiration else None, message_id=self.message_id, timestamp=self.timestamp, type=self.type, user_id=self.user_id, app_id=self.app_id ) def __repr__(self): return "{name}:{repr}".format( name=self.__class__.__name__, repr=pformat(self.info()) ) def __setattr__(self, key, value): if not key.startswith("_") and self.locked: raise ValueError("Message is locked") return super().__setattr__(key, value) def __iter__(self): return iter(self.body)
[docs] def lock(self): """ Set lock flag to `True`""" self.__lock = True
def __copy__(self): return Message( body=self.body, headers={ k: v for k, v in self.headers.items() } if self.headers else {}, content_encoding=self.content_encoding, content_type=self.content_type, delivery_mode=self.delivery_mode, priority=self.priority, correlation_id=self.correlation_id, reply_to=self.reply_to, expiration=self.expiration, message_id=self.message_id, timestamp=self.timestamp, type=self.type, user_id=self.user_id, app_id=self.app_id )
[docs]class IncomingMessage(Message): """ Incoming message it's seems like Message but has additional methods for message acknowledgement. Depending on the acknowledgement mode used, RabbitMQ can consider a message to be successfully delivered either immediately after it is sent out (written to a TCP socket) or when an explicit ("manual") client acknowledgement is received. Manually sent acknowledgements can be positive or negative and use one of the following protocol methods: * basic.ack is used for positive acknowledgements * basic.nack is used for negative acknowledgements (note: this is a RabbitMQ extension to AMQP 0-9-1) * basic.reject is used for negative acknowledgements but has one limitations compared to basic.nack Positive acknowledgements simply instruct RabbitMQ to record a message as delivered. Negative acknowledgements with basic.reject have the same effect. The difference is primarily in the semantics: positive acknowledgements assume a message was successfully processed while their negative counterpart suggests that a delivery wasn't processed but still should be deleted. """ __slots__ = ( '_loop', '__channel', 'cluster_id', 'consumer_tag', 'delivery_tag', 'exchange', 'routing_key', 'synchronous', 'redelivered', '__no_ack', '__processed' ) def __init__(self, channel: Channel, envelope, properties, body, no_ack: bool=False): """ Create an instance of :class:`IncomingMessage` :param channel: :class:`aio_pika.channel.Channel` :param envelope: pika envelope :param properties: properties :param body: message body :param no_ack: no ack needed """ self.__channel = channel self.__no_ack = no_ack self.__processed = False expiration = None if properties.expiration: expiration = convert_timestamp(float(properties.expiration)) super().__init__( body=body, content_type=properties.content_type, content_encoding=properties.content_encoding, headers=properties.headers, delivery_mode=properties.delivery_mode, priority=properties.priority, correlation_id=properties.correlation_id, reply_to=properties.reply_to, expiration=expiration / 1000. if expiration else None, message_id=properties.message_id, timestamp=convert_timestamp( float(properties.timestamp) ) if properties.timestamp else None, type=properties.type, user_id=properties.user_id, app_id=properties.app_id, ) self.cluster_id = properties.cluster_id self.consumer_tag = getattr(envelope, 'consumer_tag', None) self.delivery_tag = getattr(envelope, 'delivery_tag', None) self.exchange = envelope.exchange self.routing_key = envelope.routing_key self.redelivered = getattr(envelope, 'redelivered', None) self.synchronous = envelope.synchronous if no_ack or not self.delivery_tag: self.lock() self.__processed = True
[docs] @contextmanager def process(self, requeue=False, reject_on_redelivered=False, ignore_processed=False): """ Context manager for processing the message >>> def on_message_received(message: IncomingMessage): ... with message.process(): ... # When exception will be raised ... # the message will be rejected ... print(message.body) Example with ignore_processed=True >>> def on_message_received(message: IncomingMessage): ... with message.process(ignore_processed=True): ... # Now (with ignore_processed=True) you may reject ... # (or ack) message manually too ... if True: # some reasonable condition here ... message.reject() ... print(message.body) :param requeue: Requeue message when exception. :param reject_on_redelivered: When True message will be rejected only when message was redelivered. :param ignore_processed: Do nothing if message already processed """ try: yield self if not ignore_processed or not self.processed: self.ack() except: if not ignore_processed or not self.processed: if reject_on_redelivered and self.redelivered: log.info( "Message %r was redelivered and will be rejected", self ) self.reject(requeue=False) else: self.reject(requeue=requeue) raise
[docs] def ack(self, multiple: bool=False): """ Send basic.ack is used for positive acknowledgements .. note:: This method looks like a blocking-method, but actually it's just send bytes to the socket and not required any responses from the broker. :param multiple: If set to True, the message's delivery tag is treated as "up to and including", so that multiple messages can be acknowledged with a single method. If set to False, the ack refers to a single message. :return: None """ if self.__no_ack: raise TypeError("Can't ack message with \"no_ack\" flag") if self.__processed: raise MessageProcessError("Message already processed") self.__channel.basic_ack( delivery_tag=self.delivery_tag, multiple=multiple ) self.__processed = True if not self.locked: self.lock()
[docs] def reject(self, requeue=False): """ When `requeue=True` the message will be returned to queue. Otherwise message will be dropped. .. note:: This method looks like a blocking-method, but actually it's just send bytes to the socket and not required any responses from the broker. :param requeue: bool """ if self.__no_ack: raise TypeError('This message has "no_ack" flag.') if self.__processed: raise MessageProcessError("Message already processed") self.__channel.basic_reject( delivery_tag=self.delivery_tag, requeue=requeue ) self.__processed = True if not self.locked: self.lock()
def nack(self, multiple: bool=False, requeue: bool=True): if self.__no_ack: raise TypeError("Can't nack message with \"no_ack\" flag") if self.__processed: raise MessageProcessError("Message already processed") self.__channel.basic_nack( delivery_tag=self.delivery_tag, multiple=multiple, requeue=requeue ) self.__processed = True if not self.locked: self.lock()
[docs] def info(self) -> dict: """ Method returns dict representation of the message """ info = super(IncomingMessage, self).info() info['cluster_id'] = self.cluster_id info['consumer_tag'] = self.consumer_tag info['delivery_tag'] = self.delivery_tag info['exchange'] = self.exchange info['redelivered'] = self.redelivered info['routing_key'] = self.routing_key info['synchronous'] = self.synchronous return info
@property def processed(self): return self.__processed
class ReturnedMessage(IncomingMessage): pass __all__ = 'Message', 'IncomingMessage', 'ReturnedMessage',