Source code for aio_pika.connection

import asyncio
from ssl import SSLContext
from types import TracebackType
from typing import Any, Callable, Dict, Optional, Tuple, Type, TypeVar, Union

from import censor_url
from pamqp.common import FieldTable
from yarl import URL

from .abc import (
    AbstractChannel, AbstractConnection, SSLOptions, TimeoutType,
from .channel import Channel
from .log import get_logger
from .tools import CallbackCollection

log = get_logger(__name__)
T = TypeVar("T")

[docs]class Connection(AbstractConnection): """ Connection abstraction """ CHANNEL_CLASS: Type[Channel] = Channel KWARGS_TYPES: Tuple[Tuple[str, Callable[[str], Any], str], ...] = () _closed: bool @property def is_closed(self) -> bool: return self._closed async def close( self, exc: Optional[] = asyncio.CancelledError, ) -> None: transport, self.transport = self.transport, None self._close_called = True if not transport: return await transport.close(exc) self._closed = True @classmethod def _parse_kwargs(cls, kwargs: Dict[str, Any]) -> Dict[str, Any]: result = {} for key, parser, default in cls.KWARGS_TYPES: result[key] = parser(kwargs.get(key, default)) return result def __init__( self, url: URL, loop: Optional[asyncio.AbstractEventLoop] = None, ssl_context: Optional[SSLContext] = None, **kwargs: Any ): self.loop = loop or asyncio.get_event_loop() self.transport = None self._closed = False self._close_called = False self.url = URL(url) self.kwargs: Dict[str, Any] = self._parse_kwargs( kwargs or dict(self.url.query), ) self.kwargs["context"] = ssl_context self.close_callbacks = CallbackCollection(self) self.connected: asyncio.Event = asyncio.Event() def __str__(self) -> str: return str(censor_url(self.url)) def __repr__(self) -> str: return f'<{self.__class__.__name__}: "{self}">' async def _on_connection_close(self, closing: asyncio.Future) -> None: exc: Optional[BaseException] = closing.exception() self.connected.clear() await self.close_callbacks(exc) async def _on_connected(self, transport: UnderlayConnection) -> None: self.connected.set()
[docs] async def connect(self, timeout: TimeoutType = None) -> None: """ Connect to AMQP server. This method should be called after :func:`aio_pika.connection.Connection.__init__` .. note:: This method is called by :func:`connect`. You shouldn't call it explicitly. """ transport = await UnderlayConnection.connect( self.url, self._on_connection_close, timeout=timeout, **self.kwargs ) await self._on_connected(transport) self.transport = transport
[docs] def channel( self, channel_number: Optional[int] = None, publisher_confirms: bool = True, on_return_raises: bool = False, ) -> AbstractChannel: """ Coroutine which returns new instance of :class:`Channel`. Example: .. code-block:: python import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@" ) channel1 = await channel1.close() # Creates channel with specific channel number channel42 = await channel42.close() # For working with transactions channel_no_confirms = await publisher_confirms=False ) await channel_no_confirms.close() Also available as an asynchronous context manager: .. code-block:: python import aio_pika async def main(loop): connection = await aio_pika.connect( "amqp://guest:guest@" ) async with as channel: # channel is open and available # channel is now closed :param channel_number: specify the channel number explicit :param publisher_confirms: if `True` the :func:`aio_pika.Exchange.publish` method will be return :class:`bool` after publish is complete. Otherwise the :func:`aio_pika.Exchange.publish` method will be return :class:`None` :param on_return_raises: raise an :class:`aio_pika.exceptions.DeliveryError` when mandatory message will be returned """ if not self.transport: raise RuntimeError("Connection was not opened") log.debug("Creating AMQP channel for connection: %r", self) channel = self.CHANNEL_CLASS( connection=self.transport.connection, channel_number=channel_number, publisher_confirms=publisher_confirms, on_return_raises=on_return_raises, ) log.debug("Channel created: %r", channel) return channel
async def ready(self) -> None: await self.connected.wait() def __del__(self) -> None: if ( self.is_closed or self.loop.is_closed() or not hasattr(self, "connection") ): return asyncio.ensure_future(self.close()) async def __aenter__(self) -> "Connection": return self async def __aexit__( self, exc_type: Optional[Type[BaseException]], exc_val: Optional[BaseException], exc_tb: Optional[TracebackType], ) -> None: await self.close() async def update_secret( self, new_secret: str, *, reason: str = "", timeout: TimeoutType = None, ) -> aiormq.spec.Connection.UpdateSecretOk: if self.transport is None: raise RuntimeError("Connection is not ready") result = await self.transport.connection.update_secret( new_secret=new_secret, reason=reason, timeout=timeout, ) self.url = self.url.with_password(new_secret) return result
def make_url( url: Union[str, URL, None] = None, *, host: str = "localhost", port: int = 5672, login: str = "guest", password: str = "guest", virtualhost: str = "/", ssl: bool = False, ssl_options: Optional[SSLOptions] = None, client_properties: Optional[FieldTable] = None, **kwargs: Any, ) -> URL: if url is not None: if not isinstance(url, URL): return URL(url) return url kw = kwargs kw.update(ssl_options or {}) kw.update(client_properties or {}) # sanitize keywords kw = {k: v for k, v in kw.items() if v is not None} return scheme="amqps" if ssl else "amqp", host=host, port=port, user=login, password=password, # yarl >= 1.3.0 requires path beginning with slash path="/" + virtualhost, query=kw, )
[docs]async def connect( url: Union[str, URL, None] = None, *, host: str = "localhost", port: int = 5672, login: str = "guest", password: str = "guest", virtualhost: str = "/", ssl: bool = False, loop: Optional[asyncio.AbstractEventLoop] = None, ssl_options: Optional[SSLOptions] = None, ssl_context: Optional[SSLContext] = None, timeout: TimeoutType = None, client_properties: Optional[FieldTable] = None, connection_class: Type[AbstractConnection] = Connection, **kwargs: Any ) -> AbstractConnection: """ Make connection to the broker. Example: .. code-block:: python import aio_pika async def main(): connection = await aio_pika.connect( "amqp://guest:guest@" ) Connect to localhost with default credentials: .. code-block:: python import aio_pika async def main(): connection = await aio_pika.connect() .. note:: The available keys for ssl_options parameter are: * cert_reqs * certfile * keyfile * ssl_version For an information on what the ssl_options can be set to reference the `official Python documentation`_ . Set connection name for RabbitMQ admin panel: .. code-block:: python # As URL parameter method read_connection = await connect( "amqp://guest:guest@localhost/?name=Read%20connection" ) write_connection = await connect( client_properties={ 'connection_name': 'Write connection' } ) .. note: ``client_properties`` argument requires ``aiormq>=2.9`` URL string might be contain ssl parameters e.g. `amqps://user:pass@host//?ca_certs=ca.pem&certfile=crt.pem&keyfile=key.pem` :param client_properties: add custom client capability. :param url: RFC3986_ formatted broker address. When :class:`None` will be used keyword arguments. :param host: hostname of the broker :param port: broker port 5672 by default :param login: username string. `'guest'` by default. :param password: password string. `'guest'` by default. :param virtualhost: virtualhost parameter. `'/'` by default :param ssl: use SSL for connection. Should be used with addition kwargs. :param ssl_options: A dict of values for the SSL connection. :param timeout: connection timeout in seconds :param loop: Event loop (:func:`asyncio.get_event_loop()` when :class:`None`) :param ssl_context: ssl.SSLContext instance :param connection_class: Factory of a new connection :param kwargs: addition parameters which will be passed to the connection. :return: :class:`aio_pika.connection.Connection` .. _RFC3986: .. _official Python documentation: """ connection: AbstractConnection = connection_class( make_url( url, host=host, port=port, login=login, password=password, virtualhost=virtualhost, ssl=ssl, ssl_options=ssl_options, client_properties=client_properties, **kwargs ), loop=loop, ssl_context=ssl_context, ) await connection.connect(timeout=timeout) return connection
__all__ = ("connect", "Connection", "make_url")