From 62c8e83c90a949ee52a0e623115b0dcfd2aa55e2 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 12:09:08 -0700 Subject: [PATCH] Restructure for independent connections --- azure/eventhub/_async/__init__.py | 62 +++----------- azure/eventhub/_async/receiver_async.py | 91 ++++++++++++++++----- azure/eventhub/_async/sender_async.py | 98 +++++++++++++++------- azure/eventhub/client.py | 80 +++++++----------- azure/eventhub/common.py | 13 ++- azure/eventhub/receiver.py | 103 ++++++++++++++++-------- azure/eventhub/sender.py | 78 ++++++++++-------- 7 files changed, 304 insertions(+), 221 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index e961302..3041da5 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -15,7 +15,6 @@ from uamqp import authentication, constants, types, errors from uamqp import ( Message, - Source, ConnectionAsync, AMQPClientAsync, SendClientAsync, @@ -59,29 +58,6 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self return authentication.SASLPlain(self.address.hostname, username, password) return authentication.SASTokenAsync.from_shared_access_key(self.auth_uri, username, password, timeout=60) - def _create_connection_async(self): - """ - Create a new ~uamqp._async.connection_async.ConnectionAsync instance that will be shared between all - AsyncSender/AsyncReceiver clients. - """ - if not self.connection: - log.info("{}: Creating connection with address={}".format( - self.container_id, self.address.geturl())) - self.connection = ConnectionAsync( - self.address.hostname, - self.auth, - container_id=self.container_id, - properties=self._create_properties(), - debug=self.debug) - - async def _close_connection_async(self): - """ - Close and destroy the connection async. - """ - if self.connection: - await self.connection.destroy_async() - self.connection = None - async def _close_clients_async(self): """ Close all open AsyncSender/AsyncReceiver clients. @@ -91,17 +67,13 @@ async def _close_clients_async(self): async def _wait_for_client(self, client): try: while client.get_handler_state().value == 2: - await self.connection.work_async() + await client._handler._connection.work_async() except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) async def _start_client_async(self, client): try: - await client.open_async(self.connection) - started = await client.has_started() - while not started: - await self.connection.work_async() - started = await client.has_started() + await client.open_async() except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) @@ -114,13 +86,8 @@ async def _handle_redirect(self, redirects): redirects = [c.redirected for c in self.clients if c.redirected] if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth_uri = redirects[0].address.decode('utf-8') - self.auth = self._create_auth() - new_target, _, _ = self.auth_uri.partition("/ConsumerGroups") - self.address = urlparse(new_target) - self.mgmt_node = new_target.encode('UTF-8') + b"/$management" - await self.connection.redirect_async(redirects[0], self.auth) - await asyncio.gather(*[c.open_async(self.connection) for c in self.clients]) + self._process_redirect_uri(redirects[0]) + await asyncio.gather(*[c.open_async() for c in self.clients]) async def run_async(self): """ @@ -135,7 +102,6 @@ async def run_async(self): :rtype: list[~azure.eventhub.common.EventHubError] """ log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) - self._create_connection_async() tasks = [self._start_client_async(c) for c in self.clients] try: await asyncio.gather(*tasks) @@ -163,7 +129,6 @@ async def stop_async(self): log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) self.stopped = True await self._close_clients_async() - await self._close_connection_async() async def get_eventhub_info_async(self): """ @@ -171,18 +136,18 @@ async def get_eventhub_info_async(self): :rtype: dict """ - eh_name = self.address.path.lstrip('/') - target = "amqps://{}/{}".format(self.address.hostname, eh_name) + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} try: - mgmt_auth = self._create_auth() - mgmt_client = AMQPClientAsync(target, auth=mgmt_auth, debug=self.debug) + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug) await mgmt_client.open_async() - mgmt_msg = Message(application_properties={'name': eh_name}) + mgmt_msg = Message(application_properties={'name': self.eh_name}) response = await mgmt_client.mgmt_request_async( mgmt_msg, constants.READ_OPERATION, op_type=b'com.microsoft:eventhub', - node=self.mgmt_node, status_code_field=b'status-code', description_fields=b'status-description') eh_info = response.get_data() @@ -215,12 +180,11 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30 :rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync """ path = self.address.path + operation if operation else self.address.path + source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - source = Source(source_url) - if offset is not None: - source.set_filter(offset.selector()) - handler = AsyncReceiver(self, source, prefetch=prefetch, loop=loop) + print("RECEIVER_PATH", source_url) + handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index b3b5138..072e04a 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -6,10 +6,11 @@ import asyncio from uamqp import errors, types -from uamqp import ReceiveClientAsync +from uamqp import ReceiveClientAsync, Source from azure.eventhub import EventHubError, EventData from azure.eventhub.receiver import Receiver +from azure.eventhub.common import _error_handler class AsyncReceiver(Receiver): @@ -17,7 +18,7 @@ class AsyncReceiver(Receiver): Implements the async API of a Receiver. """ - def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called + def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called """ Instantiate an async receiver. @@ -33,25 +34,32 @@ def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pyli :param loop: An event loop. """ self.loop = loop or asyncio.get_event_loop() + self.client = client + self.source = source + self.offset = offset + self.prefetch = prefetch + self.epoch = epoch + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None - self.debug = client.debug - self.offset = None - self.prefetch = prefetch self.properties = None - self.epoch = epoch + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) if epoch: self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} self._handler = ReceiveClientAsync( source, - auth=client.auth, - debug=self.debug, + auth=self.client.get_auth(), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, loop=self.loop) - async def open_async(self, connection): + async def open_async(self): """ Open the Receiver using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -61,15 +69,51 @@ async def open_async(self, connection): :type: connection: ~uamqp._async.connection_async.ConnectionAsync """ if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = ReceiveClientAsync( - self.redirected.address, - auth=None, - debug=self.debug, + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, loop=self.loop) - await self._handler.open_async(connection=connection) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() + + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + await self._handler.close_async() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClientAsync( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() async def has_started(self): """ @@ -131,25 +175,28 @@ async def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 message_batch = await self._handler.receive_message_batch_async( max_batch_size=max_batch_size, timeout=timeout_ms) - data_batch = [] for message in message_batch: event_data = EventData(message=message) self.offset = event_data.offset data_batch.append(event_data) return data_batch - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - await self.close_async(exception=error) - raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except (errors.MessageHandlerError): + await self.reconnect_async() + return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) await self.close_async(exception=error) diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index abcd0fa..3a10d6f 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -10,6 +10,7 @@ from azure.eventhub import EventHubError from azure.eventhub.sender import Sender +from azure.eventhub.common import _error_handler class AsyncSender(Sender): """ @@ -26,23 +27,28 @@ def __init__(self, client, target, partition=None, loop=None): # pylint: disabl :type target: str :param loop: An event loop. """ + self.loop = loop or asyncio.get_event_loop() + self.client = client + self.target = target + self.partition = partition + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None - self.debug = client.debug - self.partition = partition if partition: - target += "/Partitions/" + partition - self.loop = loop or asyncio.get_event_loop() + self.target += "/Partitions/" + partition self._handler = SendClientAsync( - target, - auth=client.auth, - debug=self.debug, + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), loop=self.loop) self._outcome = None self._condition = None - async def open_async(self, connection): + async def open_async(self): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -52,12 +58,44 @@ async def open_async(self, connection): :type: connection:~uamqp._async.connection_async.ConnectionAsync """ if self.redirected: + self.target = self.redirected.address + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = SendClientAsync( - self.redirected.address, - auth=None, - debug=self.debug, - msg_timeout=Sender.TIMEOUT) - await self._handler.open_async(connection=connection) + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() + + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) + unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + await self._handler.close_async() + self._handler = SendClientAsync( + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + self._handler._pending_messages = unsent_events + await self._handler.wait_async() async def has_started(self): """ @@ -124,14 +162,15 @@ async def send(self, event_data): await self._handler.send_message_async(event_data.message) if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - await self.close_async(exception=error) - raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except (errors.MessageHandlerError): + await self.reconnect_async() except Exception as e: error = EventHubError("Send failed: {}".format(e)) await self.close_async(exception=error) @@ -147,13 +186,14 @@ async def wait_async(self): raise self.error try: await self._handler.wait_async() - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - await self.close_async(exception=error) - raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except (errors.MessageHandlerError): + await self.reconnect_async() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index d85df56..1659f70 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -8,6 +8,7 @@ import sys import uuid import time +import functools try: from urllib import urlparse, unquote_plus, urlencode, quote_plus except ImportError: @@ -16,7 +17,6 @@ import uamqp from uamqp import Connection from uamqp import Message -from uamqp import Source from uamqp import authentication from uamqp import constants @@ -108,7 +108,8 @@ def __init__(self, address, username=None, password=None, debug=False): """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self.address = urlparse(address) - self.mgmt_node = b"$management" + self.eh_name = self.address.path.lstrip('/') + self.mgmt_target = "amqps://{}/{}".format(self.address.hostname, self.eh_name) url_username = unquote_plus(self.address.username) if self.address.username else None username = username or url_username url_password = unquote_plus(self.address.password) if self.address.password else None @@ -117,8 +118,7 @@ def __init__(self, address, username=None, password=None, debug=False): raise ValueError("Missing username and/or password.") self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) self._auth_config = {'username': username, 'password': password} - self.auth = self._create_auth() - self.connection = None + self.get_auth = functools.partial(self._create_auth) self.debug = debug self.clients = [] @@ -148,10 +148,10 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) client._auth_config = { - 'username': policy, - 'password': key, - 'iot_username': username, - 'iot_password': password} # pylint: disable=protected-access + 'iot_username': policy, + 'iot_password': key, + 'username': username, + 'password': password} # pylint: disable=protected-access return client def _create_auth(self, username=None, password=None): @@ -172,7 +172,7 @@ def _create_auth(self, username=None, password=None): return authentication.SASLPlain(self.address.hostname, username, password) return authentication.SASTokenAuth.from_shared_access_key(self.auth_uri, username, password, timeout=60) - def _create_properties(self): # pylint: disable=no-self-use + def create_properties(self): # pylint: disable=no-self-use """ Format the properties with which to instantiate the connection. This acts like a user agent over HTTP. @@ -186,29 +186,6 @@ def _create_properties(self): # pylint: disable=no-self-use properties["platform"] = sys.platform return properties - def _create_connection(self): - """ - Create a new ~uamqp.connection.Connection instance that will be shared between all - Sender/Receiver clients. - """ - if not self.connection: - log.info("{}: Creating connection with address={}".format( - self.container_id, self.address.geturl())) - self.connection = Connection( - self.address.hostname, - self.auth, - container_id=self.container_id, - properties=self._create_properties(), - debug=self.debug) - - def _close_connection(self): - """ - Close and destroy the connection. - """ - if self.connection: - self.connection.destroy() - self.connection = None - def _close_clients(self): """ Close all open Sender/Receiver clients. @@ -219,25 +196,26 @@ def _close_clients(self): def _start_clients(self): for client in self.clients: try: - client.open(self.connection) - while not client.has_started(): - self.connection.work() + client.open() except Exception as exp: # pylint: disable=broad-except client.close(exception=exp) + def _process_redirect_uri(self, redirect): + redirect_uri = redirect.address.decode('utf-8') + auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") + self.address = urlparse(auth_uri) + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self.eh_name = self.address.path.lstrip('/') + self.mgmt_target = redirect_uri + def _handle_redirect(self, redirects): if len(redirects) != len(self.clients): raise EventHubError("Some clients are attempting to redirect the connection.") if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth_uri = redirects[0].address.decode('utf-8') - self.auth = self._create_auth() - new_target, _, _ = self.auth_uri.partition("/ConsumerGroups") - self.address = urlparse(new_target) - self.mgmt_node = new_target.encode('UTF-8') + b"/$management" - self.connection.redirect(redirects[0], self.auth) + self._process_redirect_uri(redirects[0]) for client in self.clients: - client.open(self.connection) + client.open() def run(self): """ @@ -252,7 +230,6 @@ def run(self): :rtype: list[~azure.eventhub.common.EventHubError] """ log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) - self._create_connection() try: self._start_clients() redirects = [c.redirected for c in self.clients if c.redirected] @@ -279,7 +256,6 @@ def stop(self): log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) self.stopped = True self._close_clients() - self._close_connection() def get_eventhub_info(self): """ @@ -293,13 +269,14 @@ def get_eventhub_info(self): :rtype: dict """ - eh_name = self.address.path.lstrip('/') - target = "amqps://{}/{}".format(self.address.hostname, eh_name) - mgmt_auth = self._create_auth() - mgmt_client = uamqp.AMQPClient(target, auth=mgmt_auth, debug=self.debug) + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} try: + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = uamqp.AMQPClient(self.mgmt_target, auth=mgmt_auth, debug=self.debug) mgmt_client.open() - mgmt_msg = Message(application_properties={'name': eh_name}) + mgmt_msg = Message(application_properties={'name': self.eh_name}) response = mgmt_client.mgmt_request( mgmt_msg, constants.READ_OPERATION, @@ -340,10 +317,7 @@ def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, ope path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - source = Source(source_url) - if offset is not None: - source.set_filter(offset.selector()) - handler = Receiver(self, source, prefetch=prefetch) + handler = Receiver(self, source_url, offset=offset, prefetch=prefetch) self.clients.append(handler) return handler diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index f14e778..a89eebc 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -10,6 +10,13 @@ from uamqp import types, constants, errors from uamqp.message import MessageHeader, MessageProperties +_NO_RETRY_ERRORS = ( + b"com.microsoft:argument-out-of-range", + b"com.microsoft:entity-disabled", + b"com.microsoft:auth-failed", + b"com.microsoft:precondition-failed", + b"com.microsoft:argument-error" +) def _error_handler(error): """ @@ -29,7 +36,9 @@ def _error_handler(error): elif error.condition == b'com.microsoft:operation-cancelled': return errors.ErrorAction(retry=True) elif error.condition == b"com.microsoft:container-close": - return errors.ErrorAction(retry=True) + return errors.ErrorAction(retry=True, backoff=4) + elif error.condition in _NO_RETRY_ERRORS: + return errors.ErrorAction(retry=False) return errors.ErrorAction(retry=True) @@ -97,7 +106,7 @@ def offset(self): :rtype: int """ try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8')) except (KeyError, AttributeError): return None diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index a21b30f..7e0c5e6 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -4,9 +4,9 @@ # -------------------------------------------------------------------------------------------- from uamqp import types, errors -from uamqp import ReceiveClient +from uamqp import ReceiveClient, Source -from azure.eventhub.common import EventHubError, EventData, Offset +from azure.eventhub.common import EventHubError, EventData, Offset, _error_handler class Receiver: @@ -16,38 +16,46 @@ class Receiver: timeout = 0 _epoch = b'com.microsoft:epoch' - def __init__(self, client, source, prefetch=300, epoch=None): + def __init__(self, client, source, offset=None, prefetch=300, epoch=None): """ Instantiate a receiver. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient :param source: The source EventHub from which to receive events. - :type source: ~uamqp.address.Source + :type source: str :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int :param epoch: An optional epoch value. :type epoch: int """ - self.offset = None + self.client = client + self.source = source + self.offset = offset self.prefetch = prefetch self.epoch = epoch + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.properties = None self.redirected = None - self.debug = client.debug self.error = None + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) if epoch: self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} self._handler = ReceiveClient( source, - auth=client.auth, - debug=self.debug, + auth=self.client.get_auth(), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, - timeout=self.timeout) + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) - def open(self, connection): + def open(self): """ Open the Receiver using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -57,14 +65,50 @@ def open(self, connection): :type: connection: ~uamqp.connection.Connection """ if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = ReceiveClient( - self.redirected.address, - auth=None, - debug=self.debug, + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, - timeout=self.timeout) - self._handler.open(connection) + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() + + def reconnect(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + self._handler.close() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClient( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() def get_handler_state(self): """ @@ -146,34 +190,29 @@ def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 message_batch = self._handler.receive_message_batch( max_batch_size=max_batch_size, timeout=timeout_ms) - data_batch = [] for message in message_batch: event_data = EventData(message=message) self.offset = event_data.offset data_batch.append(event_data) return data_batch - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - self.close(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + self.reconnect() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except (errors.MessageHandlerError): + self.reconnect() + return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) raise error - - def selector(self, default): - """ - Create a selector for the current offset if it is set. - - :param default: The fallback receive offset. - :type default: ~azure.eventhub.common.Offset - :rtype: ~azure.eventhub.common.Offset - """ - if self.offset is not None: - return Offset(self.offset).selector() - return default diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index f635da8..ccc69d8 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -27,24 +27,26 @@ def __init__(self, client, target, partition=None): :param target: The URI of the EventHub to send to. :type target: str """ - self.conneciton = None + self.client = client + self.target = target + self.partition = partition self.redirected = None self.error = None - self.debug = client.debug - self.partition = partition self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) if partition: - target += "/Partitions/" + partition + self.target += "/Partitions/" + partition self._handler = SendClient( - target, - auth=client.auth, - debug=self.debug, + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, - error_policy=self.retry_policy) + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) self._outcome = None self._condition = None - def open(self, connection): + def open(self): #, connection): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -53,29 +55,41 @@ def open(self, connection): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ - self.connection = connection if self.redirected: + self.target = self.redirected.address + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = SendClient( - self.redirected.address, - auth=None, - debug=self.debug, + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, - retry_policy=self.retry_policy) - self._handler.open(connection) + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() #connection) + while not self.has_started(): + self._handler._connection.work() def reconnect(self): """If the Sender was disconnected from the service with a retryable error - attempt to reconnect.""" pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler.close() self._handler = SendClient( - self.redirected.address, - auth=None, - debug=self.debug, + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, - retry_policy=self.retry_policy) - self._handler.open(self.connection) + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() self._handler._pending_messages = unsent_events self._handler.wait() @@ -158,17 +172,15 @@ def send(self, event_data): error = EventHubError(str(failed), failed) self.close(exception=error) raise error - except errors.LinkDetach as detach: - if detach.action.retry: + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: self.reconnect() else: - error = EventHubError(str(detach), detach) + error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - self.close(exception=error) - raise error + except (errors.MessageHandlerError): + self.reconnect() except Exception as e: error = EventHubError("Send failed: {}".format(e)) self.close(exception=error) @@ -202,17 +214,15 @@ def wait(self): raise self.error try: self._handler.wait() - except errors.LinkDetach as detach: - if detach.action.retry: + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: self.reconnect() else: - error = EventHubError(str(detach), detach) + error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - self.close(exception=error) - raise error + except (errors.MessageHandlerError): + self.reconnect() except Exception as e: raise EventHubError("Send failed: {}".format(e))