From aeea0a752f0326164c3cd3828b38d40952013092 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 9 Aug 2018 11:17:29 -0700 Subject: [PATCH 1/4] Added send and auth timeouts --- azure/eventhub/_async/__init__.py | 17 ++++++++-- azure/eventhub/_async/sender_async.py | 29 ++++++++++++----- azure/eventhub/client.py | 47 ++++++++++++++++++++++++--- azure/eventhub/sender.py | 27 ++++++++++----- 4 files changed, 96 insertions(+), 24 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index c4bcadf..6868462 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -58,7 +58,7 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self return authentication.SASLPlain( self.address.hostname, username, password, http_proxy=self.http_proxy) return authentication.SASTokenAsync.from_shared_access_key( - self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy) + self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy) async def _close_clients_async(self): """ @@ -79,6 +79,7 @@ async def _start_client_async(self, client): except Exception as exp: # pylint: disable=broad-except log.info("Encountered error while starting handler: {}".format(exp)) await client.close_async(exception=exp) + log.info("Finished closing failed handler") async def _handle_redirect(self, redirects): if len(redirects) != len(self.clients): @@ -224,7 +225,7 @@ def add_async_epoch_receiver( self.clients.append(handler) return handler - def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True, loop=None): + def add_async_sender(self, partition=None, operation=None, send_timeout=60, keep_alive=30, auto_reconnect=True, loop=None): """ Add an async sender to the client to send ~azure.eventhub.common.EventData object to an EventHub. @@ -236,13 +237,23 @@ def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_r :operation: An optional operation to be appended to the hostname in the target URL. The value must start with `/` character. :type operation: str + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: int + :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during + periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not + be pinged. + :type keep_alive: int + :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + Default value is `True`. + :type auto_reconnect: bool :rtype: ~azure.eventhub._async.sender_async.SenderAsync """ target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: target = target + operation handler = AsyncSender( - self, target, partition=partition, keep_alive=keep_alive, + self, target, partition=partition, send_timeout=send_timeout, keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index 7dee78e..f170d08 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -18,7 +18,7 @@ class AsyncSender(Sender): Implements the async API of a Sender. """ - def __init__(self, client, target, partition=None, keep_alive=None, auto_reconnect=True, loop=None): # pylint: disable=super-init-not-called + def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True, loop=None): # pylint: disable=super-init-not-called """ Instantiate an EventHub event SenderAsync handler. @@ -26,7 +26,19 @@ def __init__(self, client, target, partition=None, keep_alive=None, auto_reconne :type client: ~azure.eventhub._async.EventHubClientAsync :param target: The URI of the EventHub to send to. :type target: str - :param loop: An event loop. + :param partition: The specific partition ID to send to. Default is `None`, in which case the service + will assign to all partitions using round-robin. + :type partition: str + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: int + :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during + periods of inactivity. The default value is `None`, i.e. no keep alive pings. + :type keep_alive: int + :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + Default value is `True`. + :type auto_reconnect: bool + :param loop: An event loop. If not specified the default event loop will be used. """ self.loop = loop or asyncio.get_event_loop() self.client = client @@ -34,6 +46,7 @@ def __init__(self, client, target, partition=None, keep_alive=None, auto_reconne self.partition = partition self.keep_alive = keep_alive self.auto_reconnect = auto_reconnect + self.timeout = send_timeout self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.name = "EHSender-{}".format(uuid.uuid4()) self.redirected = None @@ -45,7 +58,7 @@ def __init__(self, client, target, partition=None, keep_alive=None, auto_reconne self.target, auth=self.client.get_auth(), debug=self.client.debug, - msg_timeout=Sender.TIMEOUT, + msg_timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, @@ -69,7 +82,7 @@ async def open_async(self): self.target, auth=self.client.get_auth(), debug=self.client.debug, - msg_timeout=Sender.TIMEOUT, + msg_timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, @@ -82,22 +95,20 @@ async def open_async(self): async def reconnect_async(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" - # pylint: disable=protected-access - pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) - unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] await self._handler.close_async() + unsent_events = self._handler.pending_messages self._handler = SendClientAsync( self.target, auth=self.client.get_auth(), debug=self.client.debug, - msg_timeout=Sender.TIMEOUT, + msg_timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, properties=self.client.create_properties(), loop=self.loop) await self._handler.open_async() - self._handler._pending_messages = unsent_events + self._handler.queue_message(*unsent_events) await self._handler.wait_async() async def has_started(self): diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 08f07bd..250ad7a 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -88,7 +88,7 @@ class EventHubClient(object): events to and receiving events from the Azure Event Hubs service. """ - def __init__(self, address, username=None, password=None, debug=False, http_proxy=None): + def __init__(self, address, username=None, password=None, debug=False, http_proxy=None, auth_timeout=0): """ Constructs a new EventHubClient with the given address URL. @@ -108,6 +108,9 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'. :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. + :type auth_timeout: int """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self.address = urlparse(address) @@ -124,6 +127,7 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox self._auth_config = {'username': username, 'password': password} self.get_auth = functools.partial(self._create_auth) self.debug = debug + self.auth_timeout = auth_timeout self.clients = [] self.stopped = False @@ -138,6 +142,16 @@ def from_connection_string(cls, conn_str, eventhub=None, **kwargs): :type conn_str: str :param eventhub: The name of the EventHub, if the EntityName is not included in the connection string. + :param debug: Whether to output network trace logs to the logger. Default + is `False`. + :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. + :type auth_timeout: int """ address, policy, key, entity = _parse_conn_str(conn_str) entity = eventhub or entity @@ -146,6 +160,22 @@ def from_connection_string(cls, conn_str, eventhub=None, **kwargs): @classmethod def from_iothub_connection_string(cls, conn_str, **kwargs): + """ + Create an EventHubClient from an IoTHub connection string. + + :param conn_str: The connection string. + :type conn_str: str + :param debug: Whether to output network trace logs to the logger. Default + is `False`. + :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] + :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. + The default value is 60 seconds. + :type auth_timeout: int + """ address, policy, key, _ = _parse_conn_str(conn_str) hub_name = address.split('.')[0] username = "{}@sas.root.{}".format(policy, hub_name) @@ -176,7 +206,7 @@ def _create_auth(self, username=None, password=None): return authentication.SASLPlain( self.address.hostname, username, password, http_proxy=self.http_proxy) return authentication.SASTokenAuth.from_shared_access_key( - self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy) + self.auth_uri, username, password, timeout=self.auth_timeout, http_proxy=self.http_proxy) def create_properties(self): # pylint: disable=no-self-use """ @@ -362,7 +392,7 @@ def add_epoch_receiver( self.clients.append(handler) return handler - def add_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True): + def add_sender(self, partition=None, operation=None, send_timeout=60, keep_alive=30, auto_reconnect=True): """ Add a sender to the client to send ~azure.eventhub.common.EventData object to an EventHub. @@ -374,11 +404,20 @@ def add_sender(self, partition=None, operation=None, keep_alive=30, auto_reconne :operation: An optional operation to be appended to the hostname in the target URL. The value must start with `/` character. :type operation: str + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: int + :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during + periods of inactivity. The default value is 30 seconds. If set to `None`, the connection will not + be pinged. + :type keep_alive: int + :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + Default value is `True`. :rtype: ~azure.eventhub.sender.Sender """ target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: target = target + operation - handler = Sender(self, target, partition=partition, keep_alive=keep_alive, auto_reconnect=auto_reconnect) + handler = Sender(self, target, partition=partition, send_timeout=send_timeout, keep_alive=keep_alive, auto_reconnect=auto_reconnect) self.clients.append(handler) return handler diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index b59ed70..c8eb544 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -15,9 +15,8 @@ class Sender: """ Implements a Sender. """ - TIMEOUT = 60.0 - def __init__(self, client, target, partition=None, keep_alive=None, auto_reconnect=True): + def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True): """ Instantiate an EventHub event Sender handler. @@ -25,10 +24,23 @@ def __init__(self, client, target, partition=None, keep_alive=None, auto_reconne :type client: ~azure.eventhub.client.EventHubClient. :param target: The URI of the EventHub to send to. :type target: str + :param partition: The specific partition ID to send to. Default is None, in which case the service + will assign to all partitions using round-robin. + :type partition: str + :param send_timeout: The timeout in seconds for an individual event to be sent from the time that it is + queued. Default value is 60 seconds. If set to 0, there will be no timeout. + :type send_timeout: int + :param keep_alive: The time interval in seconds between pinging the connection to keep it alive during + periods of inactivity. The default value is None, i.e. no keep alive pings. + :type keep_alive: int + :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. + Default value is `True`. + :type auto_reconnect: bool """ self.client = client self.target = target self.partition = partition + self.timeout = send_timeout self.redirected = None self.error = None self.keep_alive = keep_alive @@ -42,7 +54,7 @@ def __init__(self, client, target, partition=None, keep_alive=None, auto_reconne self.target, auth=self.client.get_auth(), debug=self.client.debug, - msg_timeout=Sender.TIMEOUT, + msg_timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, @@ -65,7 +77,7 @@ def open(self): self.target, auth=self.client.get_auth(), debug=self.client.debug, - msg_timeout=Sender.TIMEOUT, + msg_timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, @@ -78,20 +90,19 @@ def reconnect(self): """If the Sender was disconnected from the service with a retryable error - attempt to reconnect.""" # pylint: disable=protected-access - pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) - unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] self._handler.close() + unsent_events = self._handler.pending_messages self._handler = SendClient( self.target, auth=self.client.get_auth(), debug=self.client.debug, - msg_timeout=Sender.TIMEOUT, + msg_timeout=self.timeout, error_policy=self.retry_policy, keep_alive_interval=self.keep_alive, client_name=self.name, properties=self.client.create_properties()) self._handler.open() - self._handler._pending_messages = unsent_events + self._handler.queue_message(*unsent_events) self._handler.wait() def get_handler_state(self): From 9a335368e8d685b7ad3c6d6e6d84f01a52242e54 Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 10 Aug 2018 11:17:54 -0700 Subject: [PATCH 2/4] Changed log formatting. Retry on reconnect --- azure/eventhub/_async/__init__.py | 10 ++-- azure/eventhub/_async/receiver_async.py | 32 ++++++++++-- azure/eventhub/_async/sender_async.py | 44 ++++++++++++++-- azure/eventhub/client.py | 10 ++-- azure/eventhub/receiver.py | 28 ++++++++-- azure/eventhub/sender.py | 28 ++++++++-- .../azure_storage_checkpoint_manager.py | 37 +++++++------ azure/eventprocessorhost/eh_partition_pump.py | 12 ++--- azure/eventprocessorhost/partition_context.py | 30 +++++------ azure/eventprocessorhost/partition_manager.py | 52 +++++++++---------- azure/eventprocessorhost/partition_pump.py | 16 +++--- 11 files changed, 201 insertions(+), 98 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 6868462..68a0ef6 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -77,7 +77,7 @@ async def _start_client_async(self, client): try: await client.open_async() except Exception as exp: # pylint: disable=broad-except - log.info("Encountered error while starting handler: {}".format(exp)) + log.info("Encountered error while starting handler: %r", exp) await client.close_async(exception=exp) log.info("Finished closing failed handler") @@ -105,17 +105,17 @@ async def run_async(self): :rtype: list[~azure.eventhub.common.EventHubError] """ - log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) + log.info("%r: Starting %r clients", self.container_id, len(self.clients)) tasks = [self._start_client_async(c) for c in self.clients] try: await asyncio.gather(*tasks) redirects = [c.redirected for c in self.clients if c.redirected] failed = [c.error for c in self.clients if c.error] if failed and len(failed) == len(self.clients): - log.warning("{}: All clients failed to start.".format(self.container_id)) + log.warning("%r: All clients failed to start.", self.container_id) raise failed[0] elif failed: - log.warning("{}: {} clients failed to start.".format(self.container_id, len(failed))) + log.warning("%r: %r clients failed to start.", self.container_id, len(failed)) elif redirects: await self._handle_redirect(redirects) except EventHubError: @@ -130,7 +130,7 @@ async def stop_async(self): """ Stop the EventHubClient and all its Sender/Receiver clients. """ - log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) + log.info("%r: Stopping %r clients", self.container_id, len(self.clients)) self.stopped = True await self._close_clients_async() diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index 4cf315c..0d49231 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -128,9 +128,33 @@ async def reconnect_async(self): client_name=self.name, properties=self.client.create_properties(), loop=self.loop) - await self._handler.open_async() - while not await self.has_started(): - await self._handler._connection.work_async() + try: + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + await self.reconnect_async() + else: + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncReceiver detached. Attempting reconnect.") + await self.reconnect_async() + else: + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Receiver reconnect failed: {}".format(e)) + await self.close_async(exception=error) + raise error async def has_started(self): """ @@ -224,7 +248,7 @@ async def receive(self, max_batch_size=None, timeout=None): await self.close_async(exception=error) raise error except Exception as e: - log.info("Unexpected error occurred ({}). Shutting down.".format(e)) + log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receive failed: {}".format(e)) await self.close_async(exception=error) raise error diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index f170d08..9ef6949 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -5,6 +5,7 @@ import uuid import asyncio +import logging from uamqp import constants, errors from uamqp import SendClientAsync @@ -13,6 +14,9 @@ from azure.eventhub.sender import Sender from azure.eventhub.common import _error_handler +log = logging.getLogger(__name__) + + class AsyncSender(Sender): """ Implements the async API of a Sender. @@ -107,9 +111,33 @@ async def reconnect_async(self): client_name=self.name, properties=self.client.create_properties(), loop=self.loop) - await self._handler.open_async() - self._handler.queue_message(*unsent_events) - await self._handler.wait_async() + try: + await self._handler.open_async() + self._handler.queue_message(*unsent_events) + await self._handler.wait_async() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") + await self.reconnect_async() + else: + log.info("AsyncSender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") + await self.reconnect_async() + else: + log.info("AsyncSender reconnect failed. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) + error = EventHubError("Sender reconnect failed: {}".format(e)) + await self.close_async(exception=error) + raise error async def has_started(self): """ @@ -178,19 +206,24 @@ async def send(self, event_data): raise Sender._error(self._outcome, self._condition) except (errors.LinkDetach, errors.ConnectionClose) as shutdown: if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") await self.reconnect_async() else: + log.info("AsyncSender detached. Shutting down.") error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") await self.reconnect_async() else: + log.info("AsyncSender detached. Shutting down.") error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error except Exception as e: + log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Send failed: {}".format(e)) await self.close_async(exception=error) raise error @@ -207,17 +240,22 @@ async def wait_async(self): await self._handler.wait_async() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: if shutdown.action.retry and self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") await self.reconnect_async() else: + log.info("AsyncSender detached. Shutting down.") error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: + log.info("AsyncSender detached. Attempting reconnect.") await self.reconnect_async() else: + log.info("AsyncSender detached. Shutting down.") error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error except Exception as e: + log.info("Unexpected error occurred (%r).", e) raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 250ad7a..7f252e5 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -131,7 +131,7 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox self.clients = [] self.stopped = False - log.info("{}: Created the Event Hub client".format(self.container_id)) + log.info("%r: Created the Event Hub client", self.container_id) @classmethod def from_connection_string(cls, conn_str, eventhub=None, **kwargs): @@ -265,16 +265,16 @@ def run(self): :rtype: list[~azure.eventhub.common.EventHubError] """ - log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) + log.info("%r: Starting %r clients", self.container_id, len(self.clients)) try: self._start_clients() redirects = [c.redirected for c in self.clients if c.redirected] failed = [c.error for c in self.clients if c.error] if failed and len(failed) == len(self.clients): - log.warning("{}: All clients failed to start.".format(self.container_id)) + log.warning("%r: All clients failed to start.", self.container_id) raise failed[0] elif failed: - log.warning("{}: {} clients failed to start.".format(self.container_id, len(failed))) + log.warning("%r: %r clients failed to start.", self.container_id, len(failed)) elif redirects: self._handle_redirect(redirects) except EventHubError: @@ -289,7 +289,7 @@ def stop(self): """ Stop the EventHubClient and all its Sender/Receiver clients. """ - log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) + log.info("%r: Stopping %r clients", self.container_id, len(self.clients)) self.stopped = True self._close_clients() diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 90af41e..0b7b8a9 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -4,12 +4,15 @@ # -------------------------------------------------------------------------------------------- import uuid +import logging from uamqp import types, errors from uamqp import ReceiveClient, Source from azure.eventhub.common import EventHubError, EventData, _error_handler +log = logging.getLogger(__name__) + class Receiver: """ @@ -117,9 +120,28 @@ def reconnect(self): keep_alive_interval=self.keep_alive, client_name=self.name, properties=self.client.create_properties()) - self._handler.open() - while not self.has_started(): - self._handler._connection.work() + try: + self._handler.open() + while not self.has_started(): + self._handler._connection.work() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except Exception as e: + error = EventHubError("Receiver reconnect failed: {}".format(e)) + self.close(exception=error) + raise error def get_handler_state(self): """ diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index c8eb544..e0ed738 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -4,12 +4,15 @@ # -------------------------------------------------------------------------------------------- import uuid +import logging from uamqp import constants, errors from uamqp import SendClient from azure.eventhub.common import EventHubError, _error_handler +log = logging.getLogger(__name__) + class Sender: """ @@ -101,9 +104,28 @@ def reconnect(self): keep_alive_interval=self.keep_alive, client_name=self.name, properties=self.client.create_properties()) - self._handler.open() - self._handler.queue_message(*unsent_events) - self._handler.wait() + try: + self._handler.open() + self._handler.queue_message(*unsent_events) + self._handler.wait() + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry and self.auto_reconnect: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except errors.MessageHandlerError as shutdown: + if self.auto_reconnect: + self.reconnect() + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except Exception as e: + error = EventHubError("Sender Reconnect failed: {}".format(e)) + self.close(exception=error) + raise error def get_handler_state(self): """ diff --git a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py index 001eafa..25351f2 100644 --- a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py +++ b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py @@ -160,7 +160,7 @@ async def create_lease_store_if_not_exists_async(self): self.lease_container_name)) except Exception as err: # pylint: disable=broad-except - _logger.error(repr(err)) + _logger.error("%r", err) raise err return True @@ -206,12 +206,12 @@ async def state(): partition_id)) return res.properties.lease.state except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to get lease state {} {}".format(err, partition_id)) + _logger.error("Failed to get lease state %r %r", err, partition_id) lease.state = state return lease except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to get lease {} {}".format(err, partition_id)) + _logger.error("Failed to get lease %r %r", err, partition_id) async def get_all_leases(self): """ @@ -242,10 +242,10 @@ async def create_lease_if_not_exists_async(self, partition_id): return_lease = AzureBlobLease() return_lease.partition_id = partition_id json_lease = json.dumps(return_lease.serializable()) - _logger.info("Creating Lease {} {} {}".format( + _logger.info("Creating Lease %r %r %r", self.lease_container_name, partition_id, - json_lease)) + json_lease) await self.host.loop.run_in_executor( self.executor, functools.partial( @@ -257,7 +257,7 @@ async def create_lease_if_not_exists_async(self, partition_id): try: return_lease = await self.get_lease_async(partition_id) except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to create lease {!r}".format(err)) + _logger.error("Failed to create lease %r", err) raise err return return_lease @@ -308,7 +308,7 @@ async def acquire_lease_async(self, lease): # than it should, rebalancing will take care of that quickly enough. retval = False else: - _logger.info("ChangingLease {} {}".format(self.host.guid, lease.partition_id)) + _logger.info("ChangingLease %r %r", self.host.guid, lease.partition_id) await self.host.loop.run_in_executor( self.executor, functools.partial( @@ -319,7 +319,7 @@ async def acquire_lease_async(self, lease): new_lease_id)) lease.token = new_lease_id else: - _logger.info("AcquiringLease {} {}".format(self.host.guid, lease.partition_id)) + _logger.info("AcquiringLease %r %r", self.host.guid, lease.partition_id) lease.token = await self.host.loop.run_in_executor( self.executor, functools.partial( @@ -333,8 +333,7 @@ async def acquire_lease_async(self, lease): # check if this solves the issue retval = await self.update_lease_async(lease) except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to acquire lease {!r} {} {}".format( - err, partition_id, lease.token)) + _logger.error("Failed to acquire lease %r %r %r", err, partition_id, lease.token) return False return retval @@ -361,10 +360,10 @@ async def renew_lease_async(self, lease): timeout=self.lease_duration)) except Exception as err: # pylint: disable=broad-except if "LeaseIdMismatchWithLeaseOperation" in str(err): - _logger.info("LeaseLost on partition {}".format(lease.partition_id)) + _logger.info("LeaseLost on partition %r", lease.partition_id) else: - _logger.error("Failed to renew lease on partition {} with token {} {!r}".format( - lease.partition_id, lease.token, err)) + _logger.error("Failed to renew lease on partition %r with token %r %r", + lease.partition_id, lease.token, err) return False return True @@ -380,7 +379,7 @@ async def release_lease_async(self, lease): """ lease_id = None try: - _logger.info("Releasing lease {} {}".format(self.host.guid, lease.partition_id)) + _logger.info("Releasing lease %r %r", self.host.guid, lease.partition_id) lease_id = lease.token released_copy = AzureBlobLease() released_copy.with_lease(lease) @@ -403,8 +402,8 @@ async def release_lease_async(self, lease): lease.partition_id, lease_id)) except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to release lease {} {} {}".format( - err, lease.partition_id, lease_id)) + _logger.error("Failed to release lease %r %r %r", + err, lease.partition_id, lease_id) return False return True @@ -426,7 +425,7 @@ async def update_lease_async(self, lease): if not lease.token: return False - _logger.debug("Updating lease {} {}".format(self.host.guid, lease.partition_id)) + _logger.debug("Updating lease %r %r", self.host.guid, lease.partition_id) # First, renew the lease to make sure the update will go through. if await self.renew_lease_async(lease): @@ -441,8 +440,8 @@ async def update_lease_async(self, lease): lease_id=lease.token)) except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to update lease {} {} {}".format( - self.host.guid, lease.partition_id, err)) + _logger.error("Failed to update lease %r %r %r", + self.host.guid, lease.partition_id, err) raise err else: return False diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 368c2bb..5832b49 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -36,8 +36,8 @@ async def on_open_async(self): _opened_ok = True except Exception as err: # pylint: disable=broad-except _logger.warning( - "{},{} PartitionPumpWarning: Failure creating client or receiver, " - "retrying: {!r}".format(self.host.guid, self.partition_context.partition_id, err)) + "%r,%r PartitionPumpWarning: Failure creating client or receiver, " + "retrying: %r", self.host.guid, self.partition_context.partition_id, err) last_exception = err _retry_count += 1 @@ -102,7 +102,7 @@ async def on_closing_async(self, reason): except TypeError: _logger.debug("No partition pump running.") except Exception as err: # pylint: disable=broad-except - _logger.info("Error on closing partition pump: {!r}".format(err)) + _logger.info("Error on closing partition pump: %r", err) await self.clean_up_clients_async() @@ -128,13 +128,13 @@ async def run(self): max_batch_size=self.max_batch_size, timeout=self.recieve_timeout) except Exception as e: # pylint: disable=broad-except - _logger.info("Error raised while attempting to receive messages: {}".format(e)) + _logger.info("Error raised while attempting to receive messages: %r", e) await self.process_error_async(e) else: if not msgs: - _logger.info("No events received, queue size {}, release {}".format( + _logger.info("No events received, queue size %r, release %r", self.eh_partition_pump.partition_receive_handler.queue_size, - self.eh_partition_pump.host.eph_options.release_pump_on_timeout)) + self.eh_partition_pump.host.eph_options.release_pump_on_timeout) if self.eh_partition_pump.host.eph_options.release_pump_on_timeout: await self.process_error_async(TimeoutError("No events received")) else: diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index 33cc566..fb6926e 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -43,8 +43,8 @@ async def get_initial_offset_async(self): # throws InterruptedException, Executi :rtype: str """ - _logger.info("Calling user-provided initial offset provider {} {}".format( - self.host.guid, self.partition_id)) + _logger.info("Calling user-provided initial offset provider %r %r", + self.host.guid, self.partition_id) starting_checkpoint = await self.host.storage_manager.get_checkpoint_async(self.partition_id) if not starting_checkpoint: # No checkpoint was ever stored. Use the initialOffsetProvider instead @@ -55,8 +55,8 @@ async def get_initial_offset_async(self): # throws InterruptedException, Executi self.offset = starting_checkpoint.offset self.sequence_number = starting_checkpoint.sequence_number - _logger.info("{} {} Initial offset/sequenceNumber provided {}/{}".format( - self.host.guid, self.partition_id, self.offset, self.sequence_number)) + _logger.info("%r %r Initial offset/sequenceNumber provided %r/%r", + self.host.guid, self.partition_id, self.offset, self.sequence_number) return self.offset async def checkpoint_async(self): @@ -106,34 +106,34 @@ async def persist_checkpoint_async(self, checkpoint): :param checkpoint: The checkpoint to persist. :type checkpoint: ~azure.eventprocessorhost.checkpoint.Checkpoint """ - _logger.debug("PartitionPumpCheckpointStart {} {} {} {}".format( - self.host.guid, checkpoint.partition_id, checkpoint.offset, checkpoint.sequence_number)) + _logger.debug("PartitionPumpCheckpointStart %r %r %r %r", + self.host.guid, checkpoint.partition_id, checkpoint.offset, checkpoint.sequence_number) try: in_store_checkpoint = await self.host.storage_manager.get_checkpoint_async(checkpoint.partition_id) if not in_store_checkpoint or checkpoint.sequence_number >= in_store_checkpoint.sequence_number: if not in_store_checkpoint: - _logger.info("persisting checkpoint {}".format(checkpoint.__dict__)) + _logger.info("persisting checkpoint %r", checkpoint.__dict__) await self.host.storage_manager.create_checkpoint_if_not_exists_async(checkpoint.partition_id) if not await self.host.storage_manager.update_checkpoint_async(self.lease, checkpoint): - _logger.error("Failed to persist checkpoint for partition: {}".format(self.partition_id)) + _logger.error("Failed to persist checkpoint for partition: %r", self.partition_id) raise Exception("failed to persist checkpoint") self.lease.offset = checkpoint.offset self.lease.sequence_number = checkpoint.sequence_number else: _logger.error( - "Ignoring out of date checkpoint with offset {}/sequence number {} because " - "current persisted checkpoint has higher offset {}/sequence number {}".format( + "Ignoring out of date checkpoint with offset %r/sequence number %r because " + "current persisted checkpoint has higher offset %r/sequence number %r", checkpoint.offset, checkpoint.sequence_number, in_store_checkpoint.offset, - in_store_checkpoint.sequence_number)) + in_store_checkpoint.sequence_number) raise Exception("offset/sequenceNumber invalid") except Exception as err: - _logger.error("PartitionPumpCheckpointError {} {} {!r}".format( - self.host.guid, checkpoint.partition_id, err)) + _logger.error("PartitionPumpCheckpointError %r %r %r", + self.host.guid, checkpoint.partition_id, err) raise finally: - _logger.debug("PartitionPumpCheckpointStop {} {}".format( - self.host.guid, checkpoint.partition_id)) + _logger.debug("PartitionPumpCheckpointStop %r %r", + self.host.guid, checkpoint.partition_id) diff --git a/azure/eventprocessorhost/partition_manager.py b/azure/eventprocessorhost/partition_manager.py index c9c927c..5778ce3 100644 --- a/azure/eventprocessorhost/partition_manager.py +++ b/azure/eventprocessorhost/partition_manager.py @@ -57,7 +57,7 @@ async def start_async(self): raise Exception("A PartitionManager cannot be started multiple times.") partition_count = await self.initialize_stores_async() - _logger.info("{} PartitionCount: {}".format(self.host.guid, partition_count)) + _logger.info("%r PartitionCount: %r", self.host.guid, partition_count) self.run_task = asyncio.ensure_future(self.run_async()) async def stop_async(self): @@ -75,10 +75,10 @@ async def run_async(self): try: await self.run_loop_async() except Exception as err: # pylint: disable=broad-except - _logger.error("Run loop failed {!r}".format(err)) + _logger.error("Run loop failed %r", err) try: - _logger.info("Shutting down all pumps {}".format(self.host.guid)) + _logger.info("Shutting down all pumps %r", self.host.guid) await self.remove_all_pumps_async("Shutdown") except Exception as err: # pylint: disable=broad-except raise Exception("Failed to remove all pumps {!r}".format(err)) @@ -128,7 +128,7 @@ async def retry_async(self, func, partition_id, retry_message, await func(partition_id) created_okay = True except Exception as err: # pylint: disable=broad-except - _logger.error("{} {} {} {!r}".format(retry_message, host_id, partition_id, err)) + _logger.error("%r %r %r %r", retry_message, host_id, partition_id, err) retry_count += 1 if not created_okay: raise Exception(host_id, final_failure_message) @@ -172,28 +172,28 @@ async def run_loop_async(self): leases_owned_by_others, our_lease_count) if steal_this_lease: try: - _logger.info("Lease to steal {}".format(steal_this_lease.serializable())) + _logger.info("Lease to steal %r", steal_this_lease.serializable()) if await lease_manager.acquire_lease_async(steal_this_lease): - _logger.info("Stole lease sucessfully {} {}".format( - self.host.guid, steal_this_lease.partition_id)) + _logger.info("Stole lease sucessfully %r %r", + self.host.guid, steal_this_lease.partition_id) else: - _logger.info("Failed to steal lease for partition {} {}".format( - self.host.guid, steal_this_lease.partition_id)) + _logger.info("Failed to steal lease for partition %r %r", + self.host.guid, steal_this_lease.partition_id) except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to steal lease {!r}".format(err)) + _logger.error("Failed to steal lease %r", err) for partition_id in all_leases: try: updated_lease = all_leases[partition_id] if updated_lease.owner == self.host.host_name: - _logger.debug("Attempting to renew lease {} {}".format( - self.host.guid, partition_id)) + _logger.debug("Attempting to renew lease %r %r", + self.host.guid, partition_id) await self.check_and_add_pump_async(partition_id, updated_lease) else: _logger.debug("Removing pump due to lost lease.") await self.remove_pump_async(partition_id, "LeaseLost") except Exception as err: # pylint: disable=broad-except - _logger.error("Failed to update lease {!r}".format(err)) + _logger.error("Failed to update lease %r", err) await asyncio.sleep(lease_manager.lease_renew_interval) async def check_and_add_pump_async(self, partition_id, lease): @@ -217,7 +217,7 @@ async def check_and_add_pump_async(self, partition_id, lease): # when the lease changes then the pump will error and shut down captured_pump.set_lease(lease) else: - _logger.info("Starting pump {} {}".format(self.host.guid, partition_id)) + _logger.info("Starting pump %r %r", self.host.guid, partition_id) await self.create_new_pump_async(partition_id, lease) async def create_new_pump_async(self, partition_id, lease): @@ -234,7 +234,7 @@ async def create_new_pump_async(self, partition_id, lease): # Do the put after start, if the start fails then put doesn't happen loop.create_task(partition_pump.open_async()) self.partition_pumps[partition_id] = partition_pump - _logger.info("Created new partition pump {} {}".format(self.host.guid, partition_id)) + _logger.info("Created new partition pump %r %r", self.host.guid, partition_id) async def remove_pump_async(self, partition_id, reason): """ @@ -251,14 +251,14 @@ async def remove_pump_async(self, partition_id, reason): await captured_pump.close_async(reason) # else, pump is already closing/closed, don't need to try to shut it down again del self.partition_pumps[partition_id] # remove pump - _logger.debug("Removed pump {} {}".format(self.host.guid, partition_id)) - _logger.debug("{} pumps still running".format(len(self.partition_pumps))) + _logger.debug("Removed pump %r %r", self.host.guid, partition_id) + _logger.debug("%r pumps still running", len(self.partition_pumps)) else: # PartitionManager main loop tries to remove pump for every partition that the # host does not own, just to be sure. Not finding a pump for a partition is normal # and expected most of the time. - _logger.debug("No pump found to remove for this partition {} {}".format( - self.host.guid, partition_id)) + _logger.debug("No pump found to remove for this partition %r %r", + self.host.guid, partition_id) async def remove_all_pumps_async(self, reason): """ @@ -332,8 +332,8 @@ async def attempt_renew_lease_async(self, lease_task, owned_by_others_q, lease_m try: possible_lease = await lease_task if await possible_lease.is_expired(): - logging.info("Trying to aquire lease {} {}".format( - self.host.guid, possible_lease.partition_id)) + _logger.info("Trying to aquire lease %r %r", + self.host.guid, possible_lease.partition_id) if await lease_manager.acquire_lease_async(possible_lease): owned_by_others_q.put((False, possible_lease)) else: @@ -341,20 +341,20 @@ async def attempt_renew_lease_async(self, lease_task, owned_by_others_q, lease_m elif possible_lease.owner == self.host.host_name: try: - _logger.debug("Trying to renew lease {} {}".format( - self.host.guid, possible_lease.partition_id)) + _logger.debug("Trying to renew lease %r %r", + self.host.guid, possible_lease.partition_id) if await lease_manager.renew_lease_async(possible_lease): owned_by_others_q.put((False, possible_lease)) else: owned_by_others_q.put((True, possible_lease)) except Exception as err: # pylint: disable=broad-except # Update to 'Lease Lost' exception. - _logger.error("Lease lost exception {!r} {} {}".format( - err, self.host.guid, possible_lease.partition_id)) + _logger.error("Lease lost exception %r %r %r", + err, self.host.guid, possible_lease.partition_id) owned_by_others_q.put((True, possible_lease)) else: owned_by_others_q.put((True, possible_lease)) except Exception as err: # pylint: disable=broad-except _logger.error( - "Failure during getting/acquiring/renewing lease, skipping {!r}".format(err)) + "Failure during getting/acquiring/renewing lease, skipping %r", err) diff --git a/azure/eventprocessorhost/partition_pump.py b/azure/eventprocessorhost/partition_pump.py index 769e2ce..57c7ae7 100644 --- a/azure/eventprocessorhost/partition_pump.py +++ b/azure/eventprocessorhost/partition_pump.py @@ -37,7 +37,7 @@ def set_pump_status(self, status): Updates pump status and logs update to console. """ self.pump_status = status - _logger.info("{} partition {}".format(status, self.lease.partition_id)) + _logger.info("%r partition %r", status, self.lease.partition_id) def set_lease(self, new_lease): """ @@ -99,15 +99,14 @@ async def close_async(self, reason): try: await self.on_closing_async(reason) if self.processor: - _logger.info("PartitionPumpInvokeProcessorCloseStart {} {} {}".format( - self.host.guid, self.partition_context.partition_id, reason)) + _logger.info("PartitionPumpInvokeProcessorCloseStart %r %r %r", + self.host.guid, self.partition_context.partition_id, reason) await self.processor.close_async(self.partition_context, reason) - _logger.info("PartitionPumpInvokeProcessorCloseStart {} {}".format( - self.host.guid, self.partition_context.partition_id)) + _logger.info("PartitionPumpInvokeProcessorCloseStart %r %r", + self.host.guid, self.partition_context.partition_id) except Exception as err: # pylint: disable=broad-except await self.process_error_async(err) - _logger.error("{} {} {!r}".format( - self.host.guid, self.partition_context.partition_id, err)) + _logger.error("%r %r %r", self.host.guid, self.partition_context.partition_id, err) raise err if reason == "LeaseLost": @@ -115,8 +114,7 @@ async def close_async(self, reason): _logger.info("Lease Lost releasing ownership") await self.host.storage_manager.release_lease_async(self.partition_context.lease) except Exception as err: # pylint: disable=broad-except - _logger.error("{} {} {!r}".format( - self.host.guid, self.partition_context.partition_id, err)) + _logger.error("%r %r %r", self.host.guid, self.partition_context.partition_id, err) raise err self.set_pump_status("Closed") From 5692cb0dabe0a5cad315320009d4903e5dcff68e Mon Sep 17 00:00:00 2001 From: annatisch Date: Mon, 20 Aug 2018 11:26:43 -0700 Subject: [PATCH 3/4] Pylint fixes --- azure/eventhub/_async/__init__.py | 4 +++- azure/eventhub/_async/sender_async.py | 4 +++- azure/eventhub/client.py | 6 ++++-- .../azure_storage_checkpoint_manager.py | 12 +++++------ azure/eventprocessorhost/eh_partition_pump.py | 4 ++-- azure/eventprocessorhost/partition_context.py | 20 +++++++++---------- azure/eventprocessorhost/partition_manager.py | 14 ++++++------- azure/eventprocessorhost/partition_pump.py | 4 ++-- 8 files changed, 37 insertions(+), 31 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 68a0ef6..5b102b5 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -225,7 +225,9 @@ def add_async_epoch_receiver( self.clients.append(handler) return handler - def add_async_sender(self, partition=None, operation=None, send_timeout=60, keep_alive=30, auto_reconnect=True, loop=None): + def add_async_sender( + self, partition=None, operation=None, send_timeout=60, + keep_alive=30, auto_reconnect=True, loop=None): """ Add an async sender to the client to send ~azure.eventhub.common.EventData object to an EventHub. diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index 9ef6949..bd72dd5 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -22,7 +22,9 @@ class AsyncSender(Sender): Implements the async API of a Sender. """ - def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=None, auto_reconnect=True, loop=None): # pylint: disable=super-init-not-called + def __init__( # pylint: disable=super-init-not-called + self, client, target, partition=None, send_timeout=60, + keep_alive=None, auto_reconnect=True, loop=None): """ Instantiate an EventHub event SenderAsync handler. diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 7f252e5..3a61c60 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -88,7 +88,7 @@ class EventHubClient(object): events to and receiving events from the Azure Event Hubs service. """ - def __init__(self, address, username=None, password=None, debug=False, http_proxy=None, auth_timeout=0): + def __init__(self, address, username=None, password=None, debug=False, http_proxy=None, auth_timeout=60): """ Constructs a new EventHubClient with the given address URL. @@ -418,6 +418,8 @@ def add_sender(self, partition=None, operation=None, send_timeout=60, keep_alive target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: target = target + operation - handler = Sender(self, target, partition=partition, send_timeout=send_timeout, keep_alive=keep_alive, auto_reconnect=auto_reconnect) + handler = Sender( + self, target, partition=partition, send_timeout=send_timeout, + keep_alive=keep_alive, auto_reconnect=auto_reconnect) self.clients.append(handler) return handler diff --git a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py index 25351f2..8ac3abe 100644 --- a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py +++ b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py @@ -243,9 +243,9 @@ async def create_lease_if_not_exists_async(self, partition_id): return_lease.partition_id = partition_id json_lease = json.dumps(return_lease.serializable()) _logger.info("Creating Lease %r %r %r", - self.lease_container_name, - partition_id, - json_lease) + self.lease_container_name, + partition_id, + json_lease) await self.host.loop.run_in_executor( self.executor, functools.partial( @@ -363,7 +363,7 @@ async def renew_lease_async(self, lease): _logger.info("LeaseLost on partition %r", lease.partition_id) else: _logger.error("Failed to renew lease on partition %r with token %r %r", - lease.partition_id, lease.token, err) + lease.partition_id, lease.token, err) return False return True @@ -403,7 +403,7 @@ async def release_lease_async(self, lease): lease_id)) except Exception as err: # pylint: disable=broad-except _logger.error("Failed to release lease %r %r %r", - err, lease.partition_id, lease_id) + err, lease.partition_id, lease_id) return False return True @@ -441,7 +441,7 @@ async def update_lease_async(self, lease): except Exception as err: # pylint: disable=broad-except _logger.error("Failed to update lease %r %r %r", - self.host.guid, lease.partition_id, err) + self.host.guid, lease.partition_id, err) raise err else: return False diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 5832b49..995440a 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -133,8 +133,8 @@ async def run(self): else: if not msgs: _logger.info("No events received, queue size %r, release %r", - self.eh_partition_pump.partition_receive_handler.queue_size, - self.eh_partition_pump.host.eph_options.release_pump_on_timeout) + self.eh_partition_pump.partition_receive_handler.queue_size, + self.eh_partition_pump.host.eph_options.release_pump_on_timeout) if self.eh_partition_pump.host.eph_options.release_pump_on_timeout: await self.process_error_async(TimeoutError("No events received")) else: diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index fb6926e..510fdd6 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -44,7 +44,7 @@ async def get_initial_offset_async(self): # throws InterruptedException, Executi :rtype: str """ _logger.info("Calling user-provided initial offset provider %r %r", - self.host.guid, self.partition_id) + self.host.guid, self.partition_id) starting_checkpoint = await self.host.storage_manager.get_checkpoint_async(self.partition_id) if not starting_checkpoint: # No checkpoint was ever stored. Use the initialOffsetProvider instead @@ -56,7 +56,7 @@ async def get_initial_offset_async(self): # throws InterruptedException, Executi self.sequence_number = starting_checkpoint.sequence_number _logger.info("%r %r Initial offset/sequenceNumber provided %r/%r", - self.host.guid, self.partition_id, self.offset, self.sequence_number) + self.host.guid, self.partition_id, self.offset, self.sequence_number) return self.offset async def checkpoint_async(self): @@ -107,7 +107,7 @@ async def persist_checkpoint_async(self, checkpoint): :type checkpoint: ~azure.eventprocessorhost.checkpoint.Checkpoint """ _logger.debug("PartitionPumpCheckpointStart %r %r %r %r", - self.host.guid, checkpoint.partition_id, checkpoint.offset, checkpoint.sequence_number) + self.host.guid, checkpoint.partition_id, checkpoint.offset, checkpoint.sequence_number) try: in_store_checkpoint = await self.host.storage_manager.get_checkpoint_async(checkpoint.partition_id) if not in_store_checkpoint or checkpoint.sequence_number >= in_store_checkpoint.sequence_number: @@ -122,18 +122,18 @@ async def persist_checkpoint_async(self, checkpoint): self.lease.sequence_number = checkpoint.sequence_number else: _logger.error( - "Ignoring out of date checkpoint with offset %r/sequence number %r because " + "Ignoring out of date checkpoint with offset %r/sequence number %r because " + "current persisted checkpoint has higher offset %r/sequence number %r", - checkpoint.offset, - checkpoint.sequence_number, - in_store_checkpoint.offset, - in_store_checkpoint.sequence_number) + checkpoint.offset, + checkpoint.sequence_number, + in_store_checkpoint.offset, + in_store_checkpoint.sequence_number) raise Exception("offset/sequenceNumber invalid") except Exception as err: _logger.error("PartitionPumpCheckpointError %r %r %r", - self.host.guid, checkpoint.partition_id, err) + self.host.guid, checkpoint.partition_id, err) raise finally: _logger.debug("PartitionPumpCheckpointStop %r %r", - self.host.guid, checkpoint.partition_id) + self.host.guid, checkpoint.partition_id) diff --git a/azure/eventprocessorhost/partition_manager.py b/azure/eventprocessorhost/partition_manager.py index 5778ce3..41aaded 100644 --- a/azure/eventprocessorhost/partition_manager.py +++ b/azure/eventprocessorhost/partition_manager.py @@ -175,10 +175,10 @@ async def run_loop_async(self): _logger.info("Lease to steal %r", steal_this_lease.serializable()) if await lease_manager.acquire_lease_async(steal_this_lease): _logger.info("Stole lease sucessfully %r %r", - self.host.guid, steal_this_lease.partition_id) + self.host.guid, steal_this_lease.partition_id) else: _logger.info("Failed to steal lease for partition %r %r", - self.host.guid, steal_this_lease.partition_id) + self.host.guid, steal_this_lease.partition_id) except Exception as err: # pylint: disable=broad-except _logger.error("Failed to steal lease %r", err) @@ -187,7 +187,7 @@ async def run_loop_async(self): updated_lease = all_leases[partition_id] if updated_lease.owner == self.host.host_name: _logger.debug("Attempting to renew lease %r %r", - self.host.guid, partition_id) + self.host.guid, partition_id) await self.check_and_add_pump_async(partition_id, updated_lease) else: _logger.debug("Removing pump due to lost lease.") @@ -258,7 +258,7 @@ async def remove_pump_async(self, partition_id, reason): # host does not own, just to be sure. Not finding a pump for a partition is normal # and expected most of the time. _logger.debug("No pump found to remove for this partition %r %r", - self.host.guid, partition_id) + self.host.guid, partition_id) async def remove_all_pumps_async(self, reason): """ @@ -333,7 +333,7 @@ async def attempt_renew_lease_async(self, lease_task, owned_by_others_q, lease_m possible_lease = await lease_task if await possible_lease.is_expired(): _logger.info("Trying to aquire lease %r %r", - self.host.guid, possible_lease.partition_id) + self.host.guid, possible_lease.partition_id) if await lease_manager.acquire_lease_async(possible_lease): owned_by_others_q.put((False, possible_lease)) else: @@ -342,7 +342,7 @@ async def attempt_renew_lease_async(self, lease_task, owned_by_others_q, lease_m elif possible_lease.owner == self.host.host_name: try: _logger.debug("Trying to renew lease %r %r", - self.host.guid, possible_lease.partition_id) + self.host.guid, possible_lease.partition_id) if await lease_manager.renew_lease_async(possible_lease): owned_by_others_q.put((False, possible_lease)) else: @@ -350,7 +350,7 @@ async def attempt_renew_lease_async(self, lease_task, owned_by_others_q, lease_m except Exception as err: # pylint: disable=broad-except # Update to 'Lease Lost' exception. _logger.error("Lease lost exception %r %r %r", - err, self.host.guid, possible_lease.partition_id) + err, self.host.guid, possible_lease.partition_id) owned_by_others_q.put((True, possible_lease)) else: owned_by_others_q.put((True, possible_lease)) diff --git a/azure/eventprocessorhost/partition_pump.py b/azure/eventprocessorhost/partition_pump.py index 57c7ae7..be8be04 100644 --- a/azure/eventprocessorhost/partition_pump.py +++ b/azure/eventprocessorhost/partition_pump.py @@ -100,10 +100,10 @@ async def close_async(self, reason): await self.on_closing_async(reason) if self.processor: _logger.info("PartitionPumpInvokeProcessorCloseStart %r %r %r", - self.host.guid, self.partition_context.partition_id, reason) + self.host.guid, self.partition_context.partition_id, reason) await self.processor.close_async(self.partition_context, reason) _logger.info("PartitionPumpInvokeProcessorCloseStart %r %r", - self.host.guid, self.partition_context.partition_id) + self.host.guid, self.partition_context.partition_id) except Exception as err: # pylint: disable=broad-except await self.process_error_async(err) _logger.error("%r %r %r", self.host.guid, self.partition_context.partition_id, err) From c8db7936362f5db68cbe8248b40a353d97c97425 Mon Sep 17 00:00:00 2001 From: annatisch Date: Wed, 22 Aug 2018 14:10:10 -0700 Subject: [PATCH 4/4] Renamed internal async module --- HISTORY.rst | 12 +++++++++++ README.rst | 5 +++++ azure/eventhub/__init__.py | 4 ++-- .../{_async => async_ops}/__init__.py | 6 +++--- .../{_async => async_ops}/receiver_async.py | 4 ++-- .../{_async => async_ops}/sender_async.py | 4 ++-- azure/eventhub/client.py | 20 +++++++++---------- azure/eventhub/common.py | 13 ++++++++++-- azure/eventhub/sender.py | 2 +- azure/eventprocessorhost/eh_partition_pump.py | 4 ++-- setup.py | 2 +- 11 files changed, 51 insertions(+), 25 deletions(-) rename azure/eventhub/{_async => async_ops}/__init__.py (98%) rename azure/eventhub/{_async => async_ops}/receiver_async.py (98%) rename azure/eventhub/{_async => async_ops}/sender_async.py (98%) diff --git a/HISTORY.rst b/HISTORY.rst index d60b724..6e90e55 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,18 @@ Release History =============== +1.0.0 (2018-08-22) +++++++++++++++++++ + +- API stable. +- Renamed internal `_async` module to `async_ops` for docs generation. +- Added optional `auth_timeout` parameter to `EventHubClient` and `EventHubClientAsync` to configure how long to allow for token + negotiation to complete. Default is 60 seconds. +- Added optional `send_timeout` parameter to `EventHubClient.add_sender` and `EventHubClientAsync.add_async_sender` to determine the + timeout for Events to be successfully sent. Default value is 60 seconds. +- Reformatted logging for performance. + + 0.2.0 (2018-08-06) ++++++++++++++++++ diff --git a/README.rst b/README.rst index 616771e..2d75dc3 100644 --- a/README.rst +++ b/README.rst @@ -26,6 +26,11 @@ Python 2.7 support The uAMQP library currently only supports Python 3.4 and above. Python 2.7 support is planned for a future release. +Documentation ++++++++++++++ +Reference documentation is available at `docs.microsoft.com/python/api/azure-eventhub `__. + + Examples +++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index ae780c2..3cde06c 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "0.2.0" +__version__ = "1.0.0" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient @@ -11,7 +11,7 @@ from azure.eventhub.receiver import Receiver try: - from azure.eventhub._async import ( + from azure.eventhub.async_ops import ( EventHubClientAsync, AsyncSender, AsyncReceiver) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/async_ops/__init__.py similarity index 98% rename from azure/eventhub/_async/__init__.py rename to azure/eventhub/async_ops/__init__.py index 5b102b5..7774724 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/async_ops/__init__.py @@ -183,7 +183,7 @@ def add_async_receiver( :operation: An optional operation to be appended to the hostname in the source URL. The value must start with `/` character. :type operation: str - :rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync + :rtype: ~azure.eventhub.async_ops.receiver_async.ReceiverAsync """ path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( @@ -214,7 +214,7 @@ def add_async_epoch_receiver( :operation: An optional operation to be appended to the hostname in the source URL. The value must start with `/` character. :type operation: str - :rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync + :rtype: ~azure.eventhub.async_ops.receiver_async.ReceiverAsync """ path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( @@ -249,7 +249,7 @@ def add_async_sender( :param auto_reconnect: Whether to automatically reconnect the sender if a retryable error occurs. Default value is `True`. :type auto_reconnect: bool - :rtype: ~azure.eventhub._async.sender_async.SenderAsync + :rtype: ~azure.eventhub.async_ops.sender_async.SenderAsync """ target = "amqps://{}{}".format(self.address.hostname, self.address.path) if operation: diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/async_ops/receiver_async.py similarity index 98% rename from azure/eventhub/_async/receiver_async.py rename to azure/eventhub/async_ops/receiver_async.py index 0d49231..ad04520 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/async_ops/receiver_async.py @@ -29,7 +29,7 @@ def __init__( # pylint: disable=super-init-not-called Instantiate an async receiver. :param client: The parent EventHubClientAsync. - :type client: ~azure.eventhub._async.EventHubClientAsync + :type client: ~azure.eventhub.async_ops.EventHubClientAsync :param source: The source EventHub from which to receive events. :type source: ~uamqp.address.Source :param prefetch: The number of events to prefetch from the service @@ -78,7 +78,7 @@ async def open_async(self): context will be used to create a new handler before opening it. :param connection: The underlying client shared connection. - :type: connection: ~uamqp._async.connection_async.ConnectionAsync + :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync """ # pylint: disable=protected-access if self.redirected: diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/async_ops/sender_async.py similarity index 98% rename from azure/eventhub/_async/sender_async.py rename to azure/eventhub/async_ops/sender_async.py index bd72dd5..098c026 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/async_ops/sender_async.py @@ -29,7 +29,7 @@ def __init__( # pylint: disable=super-init-not-called Instantiate an EventHub event SenderAsync handler. :param client: The parent EventHubClientAsync. - :type client: ~azure.eventhub._async.EventHubClientAsync + :type client: ~azure.eventhub.async_ops.EventHubClientAsync :param target: The URI of the EventHub to send to. :type target: str :param partition: The specific partition ID to send to. Default is `None`, in which case the service @@ -80,7 +80,7 @@ async def open_async(self): context will be used to create a new handler before opening it. :param connection: The underlying client shared connection. - :type: connection:~uamqp._async.connection_async.ConnectionAsync + :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync """ if self.redirected: self.target = self.redirected.address diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 3a61c60..43c3b65 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -109,7 +109,7 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox Additionally the following keys may also be present: 'username', 'password'. :type http_proxy: dict[str, Any] :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :type auth_timeout: int """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] @@ -142,6 +142,7 @@ def from_connection_string(cls, conn_str, eventhub=None, **kwargs): :type conn_str: str :param eventhub: The name of the EventHub, if the EntityName is not included in the connection string. + :type eventhub: str :param debug: Whether to output network trace logs to the logger. Default is `False`. :type debug: bool @@ -150,7 +151,7 @@ def from_connection_string(cls, conn_str, eventhub=None, **kwargs): Additionally the following keys may also be present: 'username', 'password'. :type http_proxy: dict[str, Any] :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :type auth_timeout: int """ address, policy, key, entity = _parse_conn_str(conn_str) @@ -173,7 +174,7 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): Additionally the following keys may also be present: 'username', 'password'. :type http_proxy: dict[str, Any] :param auth_timeout: The time in seconds to wait for a token to be authorized by the service. - The default value is 60 seconds. + The default value is 60 seconds. If set to 0, no timeout will be enforced from the client. :type auth_timeout: int """ address, policy, key, _ = _parse_conn_str(conn_str) @@ -297,11 +298,11 @@ def get_eventhub_info(self): """ Get details on the specified EventHub. Keys in the details dictionary include: - -'name' - -'type' - -'created_at' - -'partition_count' - -'partition_ids' + -'name' + -'type' + -'created_at' + -'partition_count' + -'partition_ids' :rtype: dict """ @@ -394,8 +395,7 @@ def add_epoch_receiver( def add_sender(self, partition=None, operation=None, send_timeout=60, keep_alive=30, auto_reconnect=True): """ - Add a sender to the client to send ~azure.eventhub.common.EventData object - to an EventHub. + Add a sender to the client to EventData object to an EventHub. :param partition: Optionally specify a particular partition to send to. If omitted, the events will be distributed to available partitions via diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index 035a812..af4db4e 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -45,7 +45,7 @@ def _error_handler(error): class EventData(object): """ The EventData class is a holder of event content. - Acts as a wrapper to an ~uamqp.message.Message object. + Acts as a wrapper to an uamqp.message.Message object. """ PROP_SEQ_NUMBER = b"x-opt-sequence-number" @@ -186,7 +186,7 @@ def body(self): """ The body of the event data object. - :rtype: bytes or generator[bytes] + :rtype: bytes or Generator[bytes] """ return self.message.get_data() @@ -194,6 +194,7 @@ def body(self): class Offset(object): """ The offset (position or timestamp) where a receiver starts. Examples: + Beginning of the event stream: >>> offset = Offset("-1") End of the event stream: @@ -238,6 +239,14 @@ def selector(self): class EventHubError(Exception): """ Represents an error happened in the client. + + :ivar message: The error message. + :vartype message: str + :ivar error: The error condition, if available. + :vartype error: str + :ivar details: The error details, if included in the + service response. + :vartype details: dict[str, str] """ def __init__(self, message, details=None): diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index e0ed738..b7fef5e 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -235,7 +235,7 @@ def transfer(self, event_data, callback=None): :type event_data: ~azure.eventhub.common.EventData :param callback: Callback to be run once the message has been send. This must be a function that accepts two arguments. - :type callback: func[~uamqp.constants.MessageSendResult, ~azure.eventhub.common.EventHubError] + :type callback: callable[~uamqp.constants.MessageSendResult, ~azure.eventhub.common.EventHubError] """ if self.error: raise self.error diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 995440a..4ebd6a9 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -36,7 +36,7 @@ async def on_open_async(self): _opened_ok = True except Exception as err: # pylint: disable=broad-except _logger.warning( - "%r,%r PartitionPumpWarning: Failure creating client or receiver, " + "%r,%r PartitionPumpWarning: Failure creating client or receiver, " + "retrying: %r", self.host.guid, self.partition_context.partition_id, err) last_exception = err _retry_count += 1 @@ -91,7 +91,7 @@ async def clean_up_clients_async(self): async def on_closing_async(self, reason): """ - Overides partition pump on cleasing. + Overides partition pump on closing. :param reason: The reason for the shutdown. :type reason: str diff --git a/setup.py b/setup.py index 8efb8aa..891a80e 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ zip_safe=False, packages=find_packages(exclude=["examples", "tests"]), install_requires=[ - 'uamqp>=0.2.1,<0.3.0', + 'uamqp>=1.0.0,<2.0.0', 'msrestazure~=0.4.11', 'azure-common~=1.1', 'azure-storage~=0.36.0'