From 121ddef46668ce16342097192ef090d018bdce7c Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 13 Jul 2018 16:12:04 -0700 Subject: [PATCH 01/11] Working on error retry --- azure/eventhub/common.py | 40 +++++++++++++++++++++-- azure/eventhub/sender.py | 27 ++++++++++++++-- features/eventhub.feature | 13 ++++---- features/steps/eventhub.py | 18 ++++++++++- features/steps/test_utils.py | 63 ++++++++++++++++++++++++++++++++++++ 5 files changed, 149 insertions(+), 12 deletions(-) diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index 4ba972a..7098ef6 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -7,7 +7,7 @@ import time from uamqp import Message, BatchMessage -from uamqp import types +from uamqp import types, constants from uamqp.message import MessageHeader, MessageProperties @@ -208,4 +208,40 @@ class EventHubError(Exception): """ Represents an error happened in the client. """ - pass + + def __init__(self, message, details=None): + self.error = None + self.message = message + self.details = [] + if isinstance(message, constants.MessageSendResult): + self.message = "Message send failed with result: {}".format(message) + if details and isinstance(details, list) and isinstance(details[0], list): + self.details = details[0] + self.error = details[0][0] + try: + self._parse_error(details[0]) + except: + raise + if self.error: + self.message += "\nError: {}".format(self.error) + for detail in self.details: + self.message += "\n{}".format(detail) + super(EventHubError, self).__init__(self.message) + + def _parse_error(self, error_list): + details = [] + _, _, self.error = error_list[0].decode('UTF-8').partition(':') + self.message = error_list[1].decode('UTF-8') + details_index = self.message.find(" Reference:") + if details_index >= 0: + details_msg = self.message[details_index + 1:] + self.message = self.message[0:details_index] + + tracking_index = details_msg.index(", TrackingId:") + system_index = details_msg.index(", SystemTracker:") + timestamp_index = details_msg.index(", Timestamp:") + details.append(details_msg[:tracking_index]) + details.append(details_msg[tracking_index + 2: system_index]) + details.append(details_msg[system_index + 2: timestamp_index]) + details.append(details_msg[timestamp_index + 2:]) + self.details = details \ No newline at end of file diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index 358a336..0aa3e77 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -3,6 +3,9 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- +import time + +import uamqp from uamqp import constants, errors from uamqp import SendClient @@ -28,13 +31,15 @@ def __init__(self, client, target, partition=None): self.error = None self.debug = client.debug self.partition = partition + self.retry_policy = uamqp.sender.RetryPolicy(max_retries=3, on_error=self._error_handler) if partition: target += "/Partitions/" + partition self._handler = SendClient( target, auth=client.auth, debug=self.debug, - msg_timeout=Sender.TIMEOUT) + msg_timeout=Sender.TIMEOUT, + retry_policy=self.retry_policy) self._outcome = None self._condition = None @@ -52,7 +57,8 @@ def open(self, connection): self.redirected.address, auth=None, debug=self.debug, - msg_timeout=Sender.TIMEOUT) + msg_timeout=Sender.TIMEOUT, + retry_policy=self.retry_policy) self._handler.open(connection) def get_handler_state(self): @@ -180,6 +186,23 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition + def _error_handler(self, error): + """ + Called internally when an event has failed to send so we + can parse the error to determine whether we should attempt + to retry sending the event again. + Returns the action to take according to error type. + + :param error: The error received in the send attempt. + :type error: list[list[bytes]] + :rtype: ~uamqp.sender.SendFailedAction + """ + if isinstance(error, list) and isinstance(error[0], list): + error_type = error[0][0].decode('UTF-8') + if error_type == 'com.microsoft:server-busy': + return uamqp.sender.SendFailedAction(retry=True, backoff=4) + return uamqp.sender.SendFailedAction(retry=True, backoff=4) + @staticmethod def _error(outcome, condition): return None if outcome == constants.MessageSendResult.Ok else EventHubError(outcome, condition) diff --git a/features/eventhub.feature b/features/eventhub.feature index c96a998..19bf68b 100644 --- a/features/eventhub.feature +++ b/features/eventhub.feature @@ -5,15 +5,14 @@ Feature: Exercising EventHub SDK -# Scenario: Just sends for 3 days, no receives. Focus on send failures only. - @long-running - Scenario: Generic send and receive on client for 3 days. - Given the EventHub SDK is installed - And an EventHub is created with credentials retrieved - When I send and receive messages for 72 hours + Scenario: Just sends for 3 days, no receives. Focus on send failures only. + Given The EventHub SDK is installed + And An EventHub is created with credentials retrieved + When I start a message sender + And I send messages for 72 hours Then I should receive no errors - And I can shutdown the sender and receiver cleanly + And I can shutdown sender And I remove the EventHub # Scenario: Sender stays idle for 45 minutes and sends some number of messages after each idle duration. diff --git a/features/steps/eventhub.py b/features/steps/eventhub.py index b92bdc5..01cda1d 100644 --- a/features/steps/eventhub.py +++ b/features/steps/eventhub.py @@ -5,6 +5,7 @@ import asyncio import uuid +import functools from behave import * @@ -24,7 +25,22 @@ def step_impl(context): def step_impl(context, properties): #from mgmt_settings_real import get_credentials, SUBSCRIPTION_ID #rg, mgmt_client = test_utils.create_mgmt_client(get_credentials(), SUBSCRIPTION_ID) - context.eh_config = test_utils.get_eventhub_config() + _, prop = properties.split(' ') + if prop == '100TU': + context.eh_config = test_utils.get_eventhub_100TU_config() + else: + raise ValueError("Unrecognised property: {}".format(prop)) + +@When('I start a message sender') +def step_impl(context): + from azure.eventhub import EventHubClient + address = "sb://{}/{}".format(context.eh_config['hostname'], context.eh_config['event_hub']) + context.client = EventHubClient( + address, + username=context.eh_config['key_name'], + password=context.eh_config['access_key']) + context.sender = client.add_sender() + context.client.run() @when('I {clients} messages for {hours} hours') def step_impl(context, clients, hours): diff --git a/features/steps/test_utils.py b/features/steps/test_utils.py index 31eb7b5..7cc4d34 100644 --- a/features/steps/test_utils.py +++ b/features/steps/test_utils.py @@ -4,6 +4,8 @@ # -------------------------------------------------------------------------------------------- import uuid +import time +import asyncio def create_mgmt_client(credentials, subscription, location='westus'): from azure.mgmt.resource import ResourceManagementClient @@ -32,3 +34,64 @@ def get_eventhub_config(): config['consumer_group'] = "$Default" config['partition'] = "0" return config + + +def get_eventhub_100TU_config(): + config = {} + config['hostname'] = os.environ['EVENT_HUB_100TU_HOSTNAME'] + config['event_hub'] = os.environ['EVENT_HUB_100TU_NAME'] + config['key_name'] = os.environ['EVENT_HUB_100TU_SAS_POLICY'] + config['access_key'] = os.environ['EVENT_HUB_100TU_SAS_KEY'] + config['consumer_group'] = "$Default" + config['partition'] = "0" + return config + + +def send_constant_messages(sender, timeout, payload=1024): + deadline = time.time() + total = 0 + while time.time() < deadline: + data = EventData(body=b"D" * payload) + sender.send(data) + total += 1 + return total + + +def send_constant_async_messages(sender, timeout, batch_size=10000, payload=1024): + deadline = time.time() + total = 0 + while time.time() < deadline: + data = EventData(body=b"D" * args.payload) + sender.transfer(data) + total += 1 + if total % 10000 == 0: + sender.wait() + return total + + +def send_constant_async_messages(sender, timeout, batch_size=1, payload=1024): + deadline = time.time() + while time.time() < deadline: + if batch_size > 1: + data = EventData(batch=data_generator()) + else: + data = EventData(body=b"D" * payload) + + +async def receive_pump(receiver, timeout, validation=True): + total = 0 + deadline = time.time() + timeout + sequence = 0 + offset = None + while time.time() < deadline: + batch = await receiver.receive(timeout=5) + total += len(batch) + if validation: + assert receiver.offset + for event in batch: + next_sequence = event.sequence_number + assert next_sequence > sequence, "Received Event with lower sequence number than previous." + assert (next_sequence - sequence) == 1, "Sequence number skipped by a value great than 1." + sequence = next_sequence + msg_data = b"".join([b for b in event.body]).decode('UTF-8') + assert json.loads(msg_data), "Unable to deserialize Event data." From f480b0d430228702a597eaffe624082161070988 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 19 Jul 2018 13:10:59 -0700 Subject: [PATCH 02/11] Improved error processing --- azure/eventhub/_async/__init__.py | 14 ++++++- azure/eventhub/_async/receiver_async.py | 10 ++++- azure/eventhub/_async/sender_async.py | 18 ++++++++- azure/eventhub/client.py | 4 +- azure/eventhub/common.py | 49 ++++++++++++++++++------- azure/eventhub/receiver.py | 2 +- azure/eventhub/sender.py | 41 ++++++++++----------- 7 files changed, 95 insertions(+), 43 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index e2e727a..734e7b4 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -51,7 +51,7 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self """ if "@sas.root" in username: return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAsync.from_shared_access_key(auth_uri, username, password) + return authentication.SASTokenAsync.from_shared_access_key(auth_uri, username, password, timeout=60) def _create_connection_async(self): """ @@ -109,6 +109,10 @@ async def _handle_redirect(self, redirects): if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config) + #port = str(redirects[0].port).encode('UTF-8') + #path = self.address.path.encode('UTF-8') + #self.mgmt_node = b"pyot/$management" #+ redirects[0].hostname b"amqps://pyot.azure-devices.net" + b":" + port + + #print("setting mgmt node", self.mgmt_node) await self.connection.redirect_async(redirects[0], self.auth) await asyncio.gather(*[c.open_async(self.connection) for c in self.clients]) @@ -161,14 +165,18 @@ async def get_eventhub_info_async(self): :rtype: dict """ + self._create_connection_async() eh_name = self.address.path.lstrip('/') target = "amqps://{}/{}".format(self.address.hostname, eh_name) - async with AMQPClientAsync(target, auth=self.auth, debug=self.debug) as mgmt_client: + try: + mgmt_client = AMQPClientAsync(target, auth=self.auth, debug=self.debug) + await mgmt_client.open_async(connection=self.connection) mgmt_msg = Message(application_properties={'name': eh_name}) response = await mgmt_client.mgmt_request_async( mgmt_msg, constants.READ_OPERATION, op_type=b'com.microsoft:eventhub', + node=self.mgmt_node, status_code_field=b'status-code', description_fields=b'status-description') eh_info = response.get_data() @@ -180,6 +188,8 @@ async def get_eventhub_info_async(self): output['partition_count'] = eh_info[b'partition_count'] output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']] return output + finally: + await mgmt_client.close_async() def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None, loop=None): """ diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index 2ceb518..b3b5138 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -88,7 +88,7 @@ async def has_started(self): raise EventHubError("Authorization timeout.") elif auth_in_progress: return False - elif not await self._handler._client_ready(): + elif not await self._handler._client_ready_async(): return False else: return True @@ -109,6 +109,8 @@ async def close_async(self, exception=None): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = EventHubError(str(exception), exception) elif exception: self.error = EventHubError(str(exception)) else: @@ -141,7 +143,11 @@ async def receive(self, max_batch_size=None, timeout=None): data_batch.append(event_data) return data_batch except errors.LinkDetach as detach: - error = EventHubError(str(detach)) + error = EventHubError(str(detach), detach) + await self.close_async(exception=error) + raise error + except errors.ConnectionClose as close: + error = EventHubError(str(close), close) await self.close_async(exception=error) raise error except Exception as e: diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index 3e57e3c..abcd0fa 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -76,7 +76,7 @@ async def has_started(self): raise EventHubError("Authorization timeout.") elif auth_in_progress: return False - elif not await self._handler._client_ready(): + elif not await self._handler._client_ready_async(): return False else: return True @@ -97,6 +97,8 @@ async def close_async(self, exception=None): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception + elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): + self.error = EventHubError(str(error), error) elif exception: self.error = EventHubError(str(exception)) else: @@ -123,7 +125,11 @@ async def send(self, event_data): if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) except errors.LinkDetach as detach: - error = EventHubError(str(detach)) + error = EventHubError(str(detach), detach) + await self.close_async(exception=error) + raise error + except errors.ConnectionClose as close: + error = EventHubError(str(close), close) await self.close_async(exception=error) raise error except Exception as e: @@ -141,5 +147,13 @@ async def wait_async(self): raise self.error try: await self._handler.wait_async() + except errors.LinkDetach as detach: + error = EventHubError(str(detach), detach) + await self.close_async(exception=error) + raise error + except errors.ConnectionClose as close: + error = EventHubError(str(close), close) + await self.close_async(exception=error) + raise error except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 6e37dfe..8a47a03 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -108,6 +108,7 @@ def __init__(self, address, username=None, password=None, debug=False): """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self.address = urlparse(address) + self.mgmt_node = b"$management" url_username = unquote_plus(self.address.username) if self.address.username else None username = username or url_username url_password = unquote_plus(self.address.password) if self.address.password else None @@ -147,6 +148,7 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) client._auth_config = {'username': policy, 'password': key} # pylint: disable=protected-access + client.mgmt_node = ("amqps://" + address + ":5671/pyot/$management").encode('UTF-8') return client def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use @@ -163,7 +165,7 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self """ if "@sas.root" in username: return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAuth.from_shared_access_key(auth_uri, username, password) + return authentication.SASTokenAuth.from_shared_access_key(auth_uri, username, password, timeout=60) def _create_properties(self): # pylint: disable=no-self-use """ diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index 7098ef6..9c3c2f4 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -7,10 +7,30 @@ import time from uamqp import Message, BatchMessage -from uamqp import types, constants +from uamqp import types, constants, errors from uamqp.message import MessageHeader, MessageProperties +def _error_handler(error): + """ + Called internally when an event has failed to send so we + can parse the error to determine whether we should attempt + to retry sending the event again. + Returns the action to take according to error type. + + :param error: The error received in the send attempt. + :type error: Exception + :rtype: ~uamqp.errors.ErrorAction + """ + if error.condition == b'com.microsoft:server-busy': + return errors.ErrorAction(retry=True, backoff=4) + elif error.condition == b'com.microsoft:timeout': + return errors.ErrorAction(retry=True, backoff=2) + elif error.condition == b'com.microsoft:operation-cancelled': + return errors.ErrorAction(retry=True) + return errors.ErrorAction(retry=True) + + class EventData(object): """ The EventData class is a holder of event content. @@ -212,26 +232,27 @@ class EventHubError(Exception): def __init__(self, message, details=None): self.error = None self.message = message - self.details = [] + self.details = details if isinstance(message, constants.MessageSendResult): self.message = "Message send failed with result: {}".format(message) - if details and isinstance(details, list) and isinstance(details[0], list): - self.details = details[0] - self.error = details[0][0] + if details and isinstance(details, Exception): try: - self._parse_error(details[0]) - except: - raise - if self.error: + condition = details.condition.value.decode('UTF-8') + except AttributeError: + condition = details.condition.decode('UTF-8') + _, _, self.error = condition.partition(':') self.message += "\nError: {}".format(self.error) - for detail in self.details: - self.message += "\n{}".format(detail) + try: + self._parse_error(details.description) + for detail in self.details: + self.message += "\n{}".format(detail) + except: + self.message += "\n{}".format(details) super(EventHubError, self).__init__(self.message) def _parse_error(self, error_list): details = [] - _, _, self.error = error_list[0].decode('UTF-8').partition(':') - self.message = error_list[1].decode('UTF-8') + self.message = error_list if isinstance(error_list, str) else error_list.decode('UTF-8') details_index = self.message.find(" Reference:") if details_index >= 0: details_msg = self.message[details_index + 1:] @@ -244,4 +265,4 @@ def _parse_error(self, error_list): details.append(details_msg[tracking_index + 2: system_index]) details.append(details_msg[system_index + 2: timestamp_index]) details.append(details_msg[timestamp_index + 2:]) - self.details = details \ No newline at end of file + self.details = details diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 3cef829..a21b30f 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -158,7 +158,7 @@ def receive(self, max_batch_size=None, timeout=None): data_batch.append(event_data) return data_batch except errors.LinkDetach as detach: - error = EventHubError(str(detach)) + error = EventHubError(str(detach), detach) self.close(exception=error) raise error except Exception as e: diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index 0aa3e77..83a6c4c 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -9,7 +9,7 @@ from uamqp import constants, errors from uamqp import SendClient -from azure.eventhub.common import EventHubError +from azure.eventhub.common import EventHubError, _error_handler class Sender: @@ -31,7 +31,7 @@ def __init__(self, client, target, partition=None): self.error = None self.debug = client.debug self.partition = partition - self.retry_policy = uamqp.sender.RetryPolicy(max_retries=3, on_error=self._error_handler) + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) if partition: target += "/Partitions/" + partition self._handler = SendClient( @@ -39,7 +39,7 @@ def __init__(self, client, target, partition=None): auth=client.auth, debug=self.debug, msg_timeout=Sender.TIMEOUT, - retry_policy=self.retry_policy) + error_policy=self.retry_policy) self._outcome = None self._condition = None @@ -136,8 +136,16 @@ def send(self, event_data): self._handler.send_message(event_data.message) if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) + except errors.MessageException as failed: + error = EventHubError(str(failed), failed) + self.close(exception=error) + raise error except errors.LinkDetach as detach: - error = EventHubError(str(detach)) + error = EventHubError(str(detach), detach) + self.close(exception=error) + raise error + except errors.ConnectionClose as close: + error = EventHubError(str(close), close) self.close(exception=error) raise error except Exception as e: @@ -173,6 +181,14 @@ def wait(self): raise self.error try: self._handler.wait() + except errors.LinkDetach as detach: + error = EventHubError(str(detach), detach) + self.close(exception=error) + raise error + except errors.ConnectionClose as close: + error = EventHubError(str(close), close) + self.close(exception=error) + raise error except Exception as e: raise EventHubError("Send failed: {}".format(e)) @@ -186,23 +202,6 @@ def _on_outcome(self, outcome, condition): self._outcome = outcome self._condition = condition - def _error_handler(self, error): - """ - Called internally when an event has failed to send so we - can parse the error to determine whether we should attempt - to retry sending the event again. - Returns the action to take according to error type. - - :param error: The error received in the send attempt. - :type error: list[list[bytes]] - :rtype: ~uamqp.sender.SendFailedAction - """ - if isinstance(error, list) and isinstance(error[0], list): - error_type = error[0][0].decode('UTF-8') - if error_type == 'com.microsoft:server-busy': - return uamqp.sender.SendFailedAction(retry=True, backoff=4) - return uamqp.sender.SendFailedAction(retry=True, backoff=4) - @staticmethod def _error(outcome, condition): return None if outcome == constants.MessageSendResult.Ok else EventHubError(outcome, condition) From b681df5e6eb9278a1390bbf87757a3caeb0c2e64 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 19 Jul 2018 13:30:43 -0700 Subject: [PATCH 03/11] Fixed partition manager --- azure/eventprocessorhost/partition_manager.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/azure/eventprocessorhost/partition_manager.py b/azure/eventprocessorhost/partition_manager.py index 2ae402e..da58423 100644 --- a/azure/eventprocessorhost/partition_manager.py +++ b/azure/eventprocessorhost/partition_manager.py @@ -35,15 +35,17 @@ async def get_partition_ids_async(self): :rtype: list[str] """ if not self.partition_ids: - eh_client = EventHubClientAsync( - self.host.eh_config.client_address, - debug=self.host.eph_options.debug_trace) try: - eh_info = await eh_client.get_eventhub_info_async() - self.partition_ids = eh_info['partition_ids'] - except Exception as err: # pylint: disable=broad-except - raise Exception("Failed to get partition ids", repr(err)) - + eh_client = EventHubClientAsync( + self.host.eh_config.client_address, + debug=self.host.eph_options.debug_trace) + try: + eh_info = await eh_client.get_eventhub_info_async() + self.partition_ids = eh_info['partition_ids'] + except Exception as err: # pylint: disable=broad-except + raise Exception("Failed to get partition ids", repr(err)) + finally: + await eh_client.stop_async() return self.partition_ids async def start_async(self): From 90a9e774e30dc6cff45370fd8c5486b4af3980aa Mon Sep 17 00:00:00 2001 From: annatisch Date: Fri, 20 Jul 2018 07:32:33 -0700 Subject: [PATCH 04/11] Progress on IotHub error --- azure/eventhub/_async/__init__.py | 26 ++++++++++++--------- azure/eventhub/client.py | 31 ++++++++++++++++--------- azure/eventhub/common.py | 2 ++ azure/eventhub/sender.py | 36 +++++++++++++++++++++++++----- conftest.py | 2 +- tests/__init__.py | 2 +- tests/test_iothub_receive_async.py | 15 +++++-------- tests/test_iothub_send.py | 2 ++ tests/test_negative.py | 6 +++-- tests/test_receive_async.py | 2 ++ 10 files changed, 84 insertions(+), 40 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 734e7b4..e961302 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -7,6 +7,10 @@ import asyncio import time import datetime +try: + from urllib import urlparse, unquote_plus, urlencode, quote_plus +except ImportError: + from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus from uamqp import authentication, constants, types, errors from uamqp import ( @@ -37,7 +41,7 @@ class EventHubClientAsync(EventHubClient): sending events to and receiving events from the Azure Event Hubs service. """ - def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use + def _create_auth(self, username=None, password=None): # pylint: disable=no-self-use """ Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate the session. @@ -49,9 +53,11 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self :param password: The shared access key. :type password: str """ + username = username or self._auth_config['username'] + password = password or self._auth_config['password'] if "@sas.root" in username: return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAsync.from_shared_access_key(auth_uri, username, password, timeout=60) + return authentication.SASTokenAsync.from_shared_access_key(self.auth_uri, username, password, timeout=60) def _create_connection_async(self): """ @@ -108,11 +114,11 @@ async def _handle_redirect(self, redirects): redirects = [c.redirected for c in self.clients if c.redirected] if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config) - #port = str(redirects[0].port).encode('UTF-8') - #path = self.address.path.encode('UTF-8') - #self.mgmt_node = b"pyot/$management" #+ redirects[0].hostname b"amqps://pyot.azure-devices.net" + b":" + port + - #print("setting mgmt node", self.mgmt_node) + self.auth_uri = redirects[0].address.decode('utf-8') + self.auth = self._create_auth() + new_target, _, _ = self.auth_uri.partition("/ConsumerGroups") + self.address = urlparse(new_target) + self.mgmt_node = new_target.encode('UTF-8') + b"/$management" await self.connection.redirect_async(redirects[0], self.auth) await asyncio.gather(*[c.open_async(self.connection) for c in self.clients]) @@ -165,12 +171,12 @@ async def get_eventhub_info_async(self): :rtype: dict """ - self._create_connection_async() eh_name = self.address.path.lstrip('/') target = "amqps://{}/{}".format(self.address.hostname, eh_name) try: - mgmt_client = AMQPClientAsync(target, auth=self.auth, debug=self.debug) - await mgmt_client.open_async(connection=self.connection) + mgmt_auth = self._create_auth() + mgmt_client = AMQPClientAsync(target, auth=mgmt_auth, debug=self.debug) + await mgmt_client.open_async() mgmt_msg = Message(application_properties={'name': eh_name}) response = await mgmt_client.mgmt_request_async( mgmt_msg, diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 8a47a03..d85df56 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -115,9 +115,9 @@ def __init__(self, address, username=None, password=None, debug=False): password = password or url_password if not username or not password: raise ValueError("Missing username and/or password.") - auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) - self.auth = self._create_auth(auth_uri, username, password) - self._auth_config = None + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self._auth_config = {'username': username, 'password': password} + self.auth = self._create_auth() self.connection = None self.debug = debug @@ -147,11 +147,14 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): username = "{}@sas.root.{}".format(policy, hub_name) password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) - client._auth_config = {'username': policy, 'password': key} # pylint: disable=protected-access - client.mgmt_node = ("amqps://" + address + ":5671/pyot/$management").encode('UTF-8') + client._auth_config = { + 'username': policy, + 'password': key, + 'iot_username': username, + 'iot_password': password} # pylint: disable=protected-access return client - def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self-use + def _create_auth(self, username=None, password=None): """ Create an ~uamqp.authentication.SASTokenAuth instance to authenticate the session. @@ -163,9 +166,11 @@ def _create_auth(self, auth_uri, username, password): # pylint: disable=no-self :param password: The shared access key. :type password: str """ + username = username or self._auth_config['username'] + password = password or self._auth_config['password'] if "@sas.root" in username: return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAuth.from_shared_access_key(auth_uri, username, password, timeout=60) + return authentication.SASTokenAuth.from_shared_access_key(self.auth_uri, username, password, timeout=60) def _create_properties(self): # pylint: disable=no-self-use """ @@ -225,7 +230,11 @@ def _handle_redirect(self, redirects): raise EventHubError("Some clients are attempting to redirect the connection.") if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth = self._create_auth(redirects[0].address.decode('utf-8'), **self._auth_config) + self.auth_uri = redirects[0].address.decode('utf-8') + self.auth = self._create_auth() + new_target, _, _ = self.auth_uri.partition("/ConsumerGroups") + self.address = urlparse(new_target) + self.mgmt_node = new_target.encode('UTF-8') + b"/$management" self.connection.redirect(redirects[0], self.auth) for client in self.clients: client.open(self.connection) @@ -284,12 +293,12 @@ def get_eventhub_info(self): :rtype: dict """ - self._create_connection() eh_name = self.address.path.lstrip('/') target = "amqps://{}/{}".format(self.address.hostname, eh_name) - mgmt_client = uamqp.AMQPClient(target, auth=self.auth, debug=self.debug) - mgmt_client.open(self.connection) + mgmt_auth = self._create_auth() + mgmt_client = uamqp.AMQPClient(target, auth=mgmt_auth, debug=self.debug) try: + mgmt_client.open() mgmt_msg = Message(application_properties={'name': eh_name}) response = mgmt_client.mgmt_request( mgmt_msg, diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index 9c3c2f4..f14e778 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -28,6 +28,8 @@ def _error_handler(error): return errors.ErrorAction(retry=True, backoff=2) elif error.condition == b'com.microsoft:operation-cancelled': return errors.ErrorAction(retry=True) + elif error.condition == b"com.microsoft:container-close": + return errors.ErrorAction(retry=True) return errors.ErrorAction(retry=True) diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index 83a6c4c..f635da8 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -27,6 +27,7 @@ def __init__(self, client, target, partition=None): :param target: The URI of the EventHub to send to. :type target: str """ + self.conneciton = None self.redirected = None self.error = None self.debug = client.debug @@ -52,6 +53,7 @@ def open(self, connection): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ + self.connection = connection if self.redirected: self._handler = SendClient( self.redirected.address, @@ -61,6 +63,22 @@ def open(self, connection): retry_policy=self.retry_policy) self._handler.open(connection) + def reconnect(self): + """If the Sender was disconnected from the service with + a retryable error - attempt to reconnect.""" + pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) + unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + self._handler.close() + self._handler = SendClient( + self.redirected.address, + auth=None, + debug=self.debug, + msg_timeout=Sender.TIMEOUT, + retry_policy=self.retry_policy) + self._handler.open(self.connection) + self._handler._pending_messages = unsent_events + self._handler.wait() + def get_handler_state(self): """ Get the state of the underlying handler with regards to start @@ -141,9 +159,12 @@ def send(self, event_data): self.close(exception=error) raise error except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - self.close(exception=error) - raise error + if detach.action.retry: + self.reconnect() + else: + error = EventHubError(str(detach), detach) + self.close(exception=error) + raise error except errors.ConnectionClose as close: error = EventHubError(str(close), close) self.close(exception=error) @@ -182,9 +203,12 @@ def wait(self): try: self._handler.wait() except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - self.close(exception=error) - raise error + if detach.action.retry: + self.reconnect() + else: + error = EventHubError(str(detach), detach) + self.close(exception=error) + raise error except errors.ConnectionClose as close: error = EventHubError(str(close), close) self.close(exception=error) diff --git a/conftest.py b/conftest.py index 6932e27..5fcf292 100644 --- a/conftest.py +++ b/conftest.py @@ -79,7 +79,7 @@ def invalid_policy(): @pytest.fixture() def iot_connection_str(): try: - return os.environ['IOT_HUB_CONNECTION_STR'] + return os.environ['IOTHUB_CONNECTION_STR'] except KeyError: pytest.skip("No IotHub connection string found.") diff --git a/tests/__init__.py b/tests/__init__.py index 7b7c91a..7ec7d3b 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,7 +12,7 @@ def get_logger(filename, level=logging.INFO): azure_logger = logging.getLogger("azure") azure_logger.setLevel(level) uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.INFO) + uamqp_logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') console_handler = logging.StreamHandler(stream=sys.stdout) diff --git a/tests/test_iothub_receive_async.py b/tests/test_iothub_receive_async.py index a7126d3..e1e718a 100644 --- a/tests/test_iothub_receive_async.py +++ b/tests/test_iothub_receive_async.py @@ -25,13 +25,14 @@ async def pump(receiver, sleep=None): @pytest.mark.asyncio -async def test_iothub_receive_async(iot_connection_str): +async def test_iothub_receive_multiple_async(iot_connection_str): client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) - receivers = [] - for i in range(2): - receivers.append(client.add_async_receiver("$default", "0", prefetch=1000, operation='/messages/events')) - await client.run_async() try: + receivers = [] + for i in range(2): + receivers.append(client.add_async_receiver("$default", "0", prefetch=1000, operation='/messages/events')) + await client.run_async() + partitions = await client.get_eventhub_info_async() outputs = await asyncio.gather( pump(receivers[0]), pump(receivers[1]), @@ -39,8 +40,6 @@ async def test_iothub_receive_async(iot_connection_str): assert isinstance(outputs[0], int) and outputs[0] == 0 assert isinstance(outputs[1], int) and outputs[1] == 0 - except: - raise finally: await client.stop_async() @@ -60,7 +59,5 @@ async def test_iothub_receive_detach_async(iot_connection_str): assert isinstance(outputs[0], int) and outputs[0] == 0 assert isinstance(outputs[1], EventHubError) - except: - raise finally: await client.stop_async() \ No newline at end of file diff --git a/tests/test_iothub_send.py b/tests/test_iothub_send.py index 7c0dd7c..13c4eef 100644 --- a/tests/test_iothub_send.py +++ b/tests/test_iothub_send.py @@ -19,6 +19,8 @@ def test_iothub_send_single_event(iot_connection_str, device_id): client = EventHubClient.from_iothub_connection_string(iot_connection_str, debug=True) sender = client.add_sender(operation='/messages/devicebound') try: + with pytest.raises(NotImplementedError): + partitions = client.get_eventhub_info() client.run() outcome = sender.send(EventData(b"A single event", to_device=device_id)) assert outcome.value == 0 diff --git a/tests/test_negative.py b/tests/test_negative.py index 753f2e2..fc41287 100644 --- a/tests/test_negative.py +++ b/tests/test_negative.py @@ -221,7 +221,7 @@ async def test_send_to_invalid_partitions_async(connection_str): def test_send_too_large_message(connection_str): - client = EventHubClient.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=True) sender = client.add_sender() try: client.run() @@ -299,6 +299,8 @@ async def test_max_receivers_async(connection_str, senders): pump(receivers[5]), return_exceptions=True) print(outputs) - assert len([o for o in outputs if isinstance(o, EventHubError)]) == 1 + failed = [o for o in outputs if isinstance(o, EventHubError)] + assert len(failed) == 1 + print(failed[0].message) finally: await client.stop_async() \ No newline at end of file diff --git a/tests/test_receive_async.py b/tests/test_receive_async.py index d002674..8cfb370 100644 --- a/tests/test_receive_async.py +++ b/tests/test_receive_async.py @@ -216,11 +216,13 @@ async def test_epoch_receiver_async(connection_str, senders): @pytest.mark.asyncio async def test_multiple_receiver_async(connection_str, senders): client = EventHubClientAsync.from_connection_string(connection_str, debug=True) + partitions = await client.get_eventhub_info_async() receivers = [] for i in range(2): receivers.append(client.add_async_receiver("$default", "0", prefetch=10)) try: await client.run_async() + more_partitions = await client.get_eventhub_info_async() outputs = await asyncio.gather( pump(receivers[0]), pump(receivers[1]), From d98daa464fc153e97117e596bf401bd9958f66f4 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 11:51:06 -0700 Subject: [PATCH 05/11] Some test updates --- tests/test_iothub_receive.py | 2 +- tests/test_iothub_receive_async.py | 37 +++----- tests/test_iothub_send.py | 3 +- tests/test_longrunning_receive.py | 2 +- tests/test_longrunning_send_async.py | 111 +++++++++++++++++++++++ tests/test_negative.py | 2 +- tests/test_receive.py | 11 ++- tests/test_receive_async.py | 6 +- tests/test_reconnect.py | 128 +++++++++++++++++++++++++++ tests/test_send.py | 4 +- 10 files changed, 269 insertions(+), 37 deletions(-) create mode 100644 tests/test_longrunning_send_async.py create mode 100644 tests/test_reconnect.py diff --git a/tests/test_iothub_receive.py b/tests/test_iothub_receive.py index 78c1de8..a48274b 100644 --- a/tests/test_iothub_receive.py +++ b/tests/test_iothub_receive.py @@ -11,7 +11,7 @@ from azure import eventhub from azure.eventhub import EventData, EventHubClient, Offset -def test_iothub_receive(iot_connection_str, device_id): +def test_iothub_receive_sync(iot_connection_str, device_id): client = EventHubClient.from_iothub_connection_string(iot_connection_str, debug=True) receiver = client.add_receiver("$default", "0", operation='/messages/events') try: diff --git a/tests/test_iothub_receive_async.py b/tests/test_iothub_receive_async.py index e1e718a..d26c00c 100644 --- a/tests/test_iothub_receive_async.py +++ b/tests/test_iothub_receive_async.py @@ -24,40 +24,29 @@ async def pump(receiver, sleep=None): return messages -@pytest.mark.asyncio -async def test_iothub_receive_multiple_async(iot_connection_str): - client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) +async def get_partitions(iot_connection_str): try: - receivers = [] - for i in range(2): - receivers.append(client.add_async_receiver("$default", "0", prefetch=1000, operation='/messages/events')) + client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) + client.add_async_receiver("$default", "0", prefetch=1000, operation='/messages/events') await client.run_async() partitions = await client.get_eventhub_info_async() - outputs = await asyncio.gather( - pump(receivers[0]), - pump(receivers[1]), - return_exceptions=True) - - assert isinstance(outputs[0], int) and outputs[0] == 0 - assert isinstance(outputs[1], int) and outputs[1] == 0 + return partitions["partition_ids"] finally: await client.stop_async() @pytest.mark.asyncio -async def test_iothub_receive_detach_async(iot_connection_str): +async def test_iothub_receive_multiple_async(iot_connection_str): + partitions = await get_partitions(iot_connection_str) client = EventHubClientAsync.from_iothub_connection_string(iot_connection_str, debug=True) - receivers = [] - for i in range(2): - receivers.append(client.add_async_receiver("$default", str(i), prefetch=1000, operation='/messages/events')) - await client.run_async() try: - outputs = await asyncio.gather( - pump(receivers[0]), - pump(receivers[1]), - return_exceptions=True) + receivers = [] + for p in partitions: + receivers.append(client.add_async_receiver("$default", p, prefetch=1000, operation='/messages/events')) + await client.run_async() + outputs = await asyncio.gather(*[pump(r) for r in receivers]) assert isinstance(outputs[0], int) and outputs[0] == 0 - assert isinstance(outputs[1], EventHubError) + assert isinstance(outputs[1], int) and outputs[1] == 0 finally: - await client.stop_async() \ No newline at end of file + await client.stop_async() diff --git a/tests/test_iothub_send.py b/tests/test_iothub_send.py index 13c4eef..3f39c61 100644 --- a/tests/test_iothub_send.py +++ b/tests/test_iothub_send.py @@ -19,9 +19,8 @@ def test_iothub_send_single_event(iot_connection_str, device_id): client = EventHubClient.from_iothub_connection_string(iot_connection_str, debug=True) sender = client.add_sender(operation='/messages/devicebound') try: - with pytest.raises(NotImplementedError): - partitions = client.get_eventhub_info() client.run() + partitions = client.get_eventhub_info() outcome = sender.send(EventData(b"A single event", to_device=device_id)) assert outcome.value == 0 except: diff --git a/tests/test_longrunning_receive.py b/tests/test_longrunning_receive.py index 60007a5..b32731b 100644 --- a/tests/test_longrunning_receive.py +++ b/tests/test_longrunning_receive.py @@ -48,7 +48,7 @@ async def pump(_pid, receiver, _args, _dl): _pid, total, batch[-1].sequence_number, - batch[-1].offset)) + batch[-1].offset.value)) print("{}: total received {}".format( _pid, total)) diff --git a/tests/test_longrunning_send_async.py b/tests/test_longrunning_send_async.py new file mode 100644 index 0000000..afc13fa --- /dev/null +++ b/tests/test_longrunning_send_async.py @@ -0,0 +1,111 @@ +#!/usr/bin/env python + +""" +send test +""" + +import logging +import argparse +import time +import threading +import os +import asyncio + +from azure.eventhub import EventHubClientAsync, EventData + +try: + import tests + logger = tests.get_logger("send_test.log", logging.INFO) +except ImportError: + logger = logging.getLogger("uamqp") + logger.setLevel(logging.INFO) + + +def check_send_successful(outcome, condition): + if outcome.value != 0: + print("Send failed {}".format(condition)) + + +async def get_partitions(args): + #client = EventHubClientAsync.from_connection_string( + # args.conn_str, + # eventhub=args.eventhub, debug=True) + eh_data = await args.get_eventhub_info_async() + return eh_data["partition_ids"] + + +async def pump(pid, sender, args, duration): + deadline = time.time() + duration + total = 0 + + def data_generator(): + for i in range(args.batch): + yield b"D" * args.payload + + if args.batch > 1: + logger.error("Sending batched messages") + else: + logger.error("Sending single messages") + + try: + while time.time() < deadline: + if args.batch > 1: + data = EventData(batch=data_generator()) + else: + data = EventData(body=b"D" * args.payload) + sender.transfer(data, callback=check_send_successful) + total += args.batch + if total % 10000 == 0: + await sender.wait_async() + logger.error("Send total {}".format(total)) + except Exception as err: + logger.error("Send failed {}".format(err)) + logger.error("Sent total {}".format(total)) + + +def test_long_running_partition_send_async(): + parser = argparse.ArgumentParser() + parser.add_argument("--duration", help="Duration in seconds of the test", type=int, default=30) + parser.add_argument("--payload", help="payload size", type=int, default=512) + parser.add_argument("--batch", help="Number of events to send and wait", type=int, default=1) + parser.add_argument("--partitions", help="Comma seperated partition IDs") + parser.add_argument("--conn-str", help="EventHub connection string", default=os.environ.get('EVENT_HUB_CONNECTION_STR')) + parser.add_argument("--eventhub", help="Name of EventHub") + parser.add_argument("--address", help="Address URI to the EventHub entity") + parser.add_argument("--sas-policy", help="Name of the shared access policy to authenticate with") + parser.add_argument("--sas-key", help="Shared access key") + + loop = asyncio.get_event_loop() + args, _ = parser.parse_known_args() + if args.conn_str: + client = EventHubClientAsync.from_connection_string( + args.conn_str, + eventhub=args.eventhub, debug=True) + elif args.address: + client = EventHubClient( + args.address, + username=args.sas_policy, + password=args.sas_key) + else: + try: + import pytest + pytest.skip("Must specify either '--conn-str' or '--address'") + except ImportError: + raise ValueError("Must specify either '--conn-str' or '--address'") + + try: + if not args.partitions: + partitions = loop.run_until_complete(get_partitions(client)) + else: + partitions = args.partitions.split(",") + pumps = [] + for pid in partitions: + sender = client.add_async_sender(partition=pid) + pumps.append(pump(pid, sender, args, args.duration)) + loop.run_until_complete(client.run_async()) + loop.run_until_complete(asyncio.gather(*pumps)) + finally: + loop.run_until_complete(client.stop_async()) + +if __name__ == '__main__': + test_long_running_partition_send_async() diff --git a/tests/test_negative.py b/tests/test_negative.py index fc41287..dbc8096 100644 --- a/tests/test_negative.py +++ b/tests/test_negative.py @@ -181,7 +181,7 @@ def test_receive_from_invalid_partitions_sync(connection_str): async def test_receive_from_invalid_partitions_async(connection_str): partitions = ["XYZ", "-1", "1000", "-" ] for p in partitions: - client = EventHubClientAsync.from_connection_string(connection_str, debug=False) + client = EventHubClientAsync.from_connection_string(connection_str, debug=True) receiver = client.add_async_receiver("$default", p) try: with pytest.raises(EventHubError): diff --git a/tests/test_receive.py b/tests/test_receive.py index 44cb7b2..fda5a96 100644 --- a/tests/test_receive.py +++ b/tests/test_receive.py @@ -31,7 +31,7 @@ def test_receive_end_of_stream(connection_str, senders): client.stop() -def test_receive_with_offset(connection_str, senders): +def test_receive_with_offset_sync(connection_str, senders): client = EventHubClient.from_connection_string(connection_str, debug=False) receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) try: @@ -44,7 +44,7 @@ def test_receive_with_offset(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset)) + offset_receiver = client.add_receiver("$default", "0", offset=offset) client.run() received = offset_receiver.receive(timeout=5) assert len(received) == 0 @@ -71,7 +71,7 @@ def test_receive_with_inclusive_offset(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset, inclusive=True)) + offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) client.run() received = offset_receiver.receive(timeout=5) assert len(received) == 1 @@ -83,10 +83,13 @@ def test_receive_with_inclusive_offset(connection_str, senders): def test_receive_with_datetime(connection_str, senders): client = EventHubClient.from_connection_string(connection_str, debug=False) + partitions = client.get_eventhub_info() + assert partitions["partition_ids"] == ["0", "1"] receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) try: client.run() - + more_partitions = client.get_eventhub_info() + assert more_partitions["partition_ids"] == ["0", "1"] received = receiver.receive(timeout=5) assert len(received) == 0 senders[0].send(EventData(b"Data")) diff --git a/tests/test_receive_async.py b/tests/test_receive_async.py index 8cfb370..267e82c 100644 --- a/tests/test_receive_async.py +++ b/tests/test_receive_async.py @@ -46,7 +46,7 @@ async def test_receive_with_offset_async(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset)) + offset_receiver = client.add_async_receiver("$default", "0", offset=offset) await client.run_async() received = await offset_receiver.receive(timeout=5) assert len(received) == 0 @@ -73,7 +73,7 @@ async def test_receive_with_inclusive_offset_async(connection_str, senders): assert len(received) == 1 offset = received[0].offset - offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset, inclusive=True)) + offset_receiver = client.add_async_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) await client.run_async() received = await offset_receiver.receive(timeout=5) assert len(received) == 1 @@ -217,12 +217,14 @@ async def test_epoch_receiver_async(connection_str, senders): async def test_multiple_receiver_async(connection_str, senders): client = EventHubClientAsync.from_connection_string(connection_str, debug=True) partitions = await client.get_eventhub_info_async() + assert partitions["partition_ids"] == ["0", "1"] receivers = [] for i in range(2): receivers.append(client.add_async_receiver("$default", "0", prefetch=10)) try: await client.run_async() more_partitions = await client.get_eventhub_info_async() + assert more_partitions["partition_ids"] == ["0", "1"] outputs = await asyncio.gather( pump(receivers[0]), pump(receivers[1]), diff --git a/tests/test_reconnect.py b/tests/test_reconnect.py new file mode 100644 index 0000000..a6aa0ce --- /dev/null +++ b/tests/test_reconnect.py @@ -0,0 +1,128 @@ +#------------------------------------------------------------------------- +# Copyright (c) Microsoft Corporation. All rights reserved. +# Licensed under the MIT License. See License.txt in the project root for +# license information. +#-------------------------------------------------------------------------- + +import os +import time +import asyncio +import pytest + +from azure import eventhub +from azure.eventhub import ( + EventHubClientAsync, + EventData, + Offset, + EventHubError, + EventHubClient) + + +def test_send_with_long_interval_sync(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClient.from_connection_string(connection_str, debug=True) + sender = client.add_sender() + try: + client.run() + sender.send(EventData(b"A single event")) + for _ in range(2): + time.sleep(300) + sender.send(EventData(b"A single event")) + finally: + client.stop() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + + assert len(received) == 3 + assert list(received[0].body)[0] == b"A single event" + + +@pytest.mark.asyncio +async def test_send_with_long_interval_async(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClientAsync.from_connection_string(connection_str, debug=True) + sender = client.add_async_sender() + try: + await client.run_async() + await sender.send(EventData(b"A single event")) + for _ in range(2): + await asyncio.sleep(300) + await sender.send(EventData(b"A single event")) + finally: + await client.stop_async() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + assert len(received) == 3 + assert list(received[0].body)[0] == b"A single event" + + +def test_send_with_forced_conn_close_sync(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClient.from_connection_string(connection_str, debug=True) + sender = client.add_sender() + try: + client.run() + sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + time.sleep(300) + sender.send(EventData(b"A single event")) + sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + time.sleep(300) + sender.send(EventData(b"A single event")) + sender.send(EventData(b"A single event")) + finally: + client.stop() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + assert len(received) == 5 + assert list(received[0].body)[0] == b"A single event" + + +@pytest.mark.asyncio +async def test_send_with_forced_conn_close_async(connection_str, receivers): + #pytest.skip("long running") + client = EventHubClientAsync.from_connection_string(connection_str, debug=True) + sender = client.add_async_sender() + try: + await client.run_async() + await sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + await asyncio.sleep(300) + await sender.send(EventData(b"A single event")) + await sender.send(EventData(b"A single event")) + sender._handler._message_sender.destroy() + await asyncio.sleep(300) + await sender.send(EventData(b"A single event")) + await sender.send(EventData(b"A single event")) + finally: + await client.stop_async() + + received = [] + for r in receivers: + received.extend(r.receive(timeout=1)) + assert len(received) == 5 + assert list(received[0].body)[0] == b"A single event" + + +# def test_send_with_forced_link_detach(connection_str, receivers): +# client = EventHubClient.from_connection_string(connection_str, debug=True) +# sender = client.add_sender() +# size = 20 * 1024 +# try: +# client.run() +# for i in range(1000): +# sender.transfer(EventData([b"A"*size, b"B"*size, b"C"*size, b"D"*size, b"A"*size, b"B"*size, b"C"*size, b"D"*size, b"A"*size, b"B"*size, b"C"*size, b"D"*size])) +# sender.wait() +# finally: +# client.stop() + +# received = [] +# for r in receivers: +# received.extend(r.receive(timeout=10)) diff --git a/tests/test_send.py b/tests/test_send.py index faf116f..a74c3d1 100644 --- a/tests/test_send.py +++ b/tests/test_send.py @@ -164,8 +164,8 @@ def batched(): assert len(partition_1) == 10 -def test_send_array(connection_str, receivers): - client = EventHubClient.from_connection_string(connection_str, debug=False) +def test_send_array_sync(connection_str, receivers): + client = EventHubClient.from_connection_string(connection_str, debug=True) sender = client.add_sender() try: client.run() From 1ba74a50f0182db9e8aff57db1bdd2ba32f9e03f Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 11:53:28 -0700 Subject: [PATCH 06/11] Updated uamqp dependency --- conftest.py | 4 ++-- setup.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/conftest.py b/conftest.py index 5fcf292..96d135d 100644 --- a/conftest.py +++ b/conftest.py @@ -94,7 +94,7 @@ def device_id(): @pytest.fixture() def receivers(connection_str): - client = EventHubClient.from_connection_string(connection_str, debug=True) + client = EventHubClient.from_connection_string(connection_str, debug=False) eh_hub_info = client.get_eventhub_info() partitions = eh_hub_info["partition_ids"] @@ -114,7 +114,7 @@ def receivers(connection_str): @pytest.fixture() def senders(connection_str): - client = EventHubClient.from_connection_string(connection_str, debug=False) + client = EventHubClient.from_connection_string(connection_str, debug=True) eh_hub_info = client.get_eventhub_info() partitions = eh_hub_info["partition_ids"] diff --git a/setup.py b/setup.py index 9fce5a2..df46435 100644 --- a/setup.py +++ b/setup.py @@ -55,7 +55,7 @@ zip_safe=False, packages=find_packages(exclude=["examples", "tests"]), install_requires=[ - 'uamqp~=0.1.0', + 'uamqp~=0.2.0', 'msrestazure~=0.4.11', 'azure-common~=1.1', 'azure-storage~=0.36.0' From 62c8e83c90a949ee52a0e623115b0dcfd2aa55e2 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 12:09:08 -0700 Subject: [PATCH 07/11] Restructure for independent connections --- azure/eventhub/_async/__init__.py | 62 +++----------- azure/eventhub/_async/receiver_async.py | 91 ++++++++++++++++----- azure/eventhub/_async/sender_async.py | 98 +++++++++++++++------- azure/eventhub/client.py | 80 +++++++----------- azure/eventhub/common.py | 13 ++- azure/eventhub/receiver.py | 103 ++++++++++++++++-------- azure/eventhub/sender.py | 78 ++++++++++-------- 7 files changed, 304 insertions(+), 221 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index e961302..3041da5 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -15,7 +15,6 @@ from uamqp import authentication, constants, types, errors from uamqp import ( Message, - Source, ConnectionAsync, AMQPClientAsync, SendClientAsync, @@ -59,29 +58,6 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self return authentication.SASLPlain(self.address.hostname, username, password) return authentication.SASTokenAsync.from_shared_access_key(self.auth_uri, username, password, timeout=60) - def _create_connection_async(self): - """ - Create a new ~uamqp._async.connection_async.ConnectionAsync instance that will be shared between all - AsyncSender/AsyncReceiver clients. - """ - if not self.connection: - log.info("{}: Creating connection with address={}".format( - self.container_id, self.address.geturl())) - self.connection = ConnectionAsync( - self.address.hostname, - self.auth, - container_id=self.container_id, - properties=self._create_properties(), - debug=self.debug) - - async def _close_connection_async(self): - """ - Close and destroy the connection async. - """ - if self.connection: - await self.connection.destroy_async() - self.connection = None - async def _close_clients_async(self): """ Close all open AsyncSender/AsyncReceiver clients. @@ -91,17 +67,13 @@ async def _close_clients_async(self): async def _wait_for_client(self, client): try: while client.get_handler_state().value == 2: - await self.connection.work_async() + await client._handler._connection.work_async() except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) async def _start_client_async(self, client): try: - await client.open_async(self.connection) - started = await client.has_started() - while not started: - await self.connection.work_async() - started = await client.has_started() + await client.open_async() except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) @@ -114,13 +86,8 @@ async def _handle_redirect(self, redirects): redirects = [c.redirected for c in self.clients if c.redirected] if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth_uri = redirects[0].address.decode('utf-8') - self.auth = self._create_auth() - new_target, _, _ = self.auth_uri.partition("/ConsumerGroups") - self.address = urlparse(new_target) - self.mgmt_node = new_target.encode('UTF-8') + b"/$management" - await self.connection.redirect_async(redirects[0], self.auth) - await asyncio.gather(*[c.open_async(self.connection) for c in self.clients]) + self._process_redirect_uri(redirects[0]) + await asyncio.gather(*[c.open_async() for c in self.clients]) async def run_async(self): """ @@ -135,7 +102,6 @@ async def run_async(self): :rtype: list[~azure.eventhub.common.EventHubError] """ log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) - self._create_connection_async() tasks = [self._start_client_async(c) for c in self.clients] try: await asyncio.gather(*tasks) @@ -163,7 +129,6 @@ async def stop_async(self): log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) self.stopped = True await self._close_clients_async() - await self._close_connection_async() async def get_eventhub_info_async(self): """ @@ -171,18 +136,18 @@ async def get_eventhub_info_async(self): :rtype: dict """ - eh_name = self.address.path.lstrip('/') - target = "amqps://{}/{}".format(self.address.hostname, eh_name) + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} try: - mgmt_auth = self._create_auth() - mgmt_client = AMQPClientAsync(target, auth=mgmt_auth, debug=self.debug) + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug) await mgmt_client.open_async() - mgmt_msg = Message(application_properties={'name': eh_name}) + mgmt_msg = Message(application_properties={'name': self.eh_name}) response = await mgmt_client.mgmt_request_async( mgmt_msg, constants.READ_OPERATION, op_type=b'com.microsoft:eventhub', - node=self.mgmt_node, status_code_field=b'status-code', description_fields=b'status-description') eh_info = response.get_data() @@ -215,12 +180,11 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30 :rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync """ path = self.address.path + operation if operation else self.address.path + source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - source = Source(source_url) - if offset is not None: - source.set_filter(offset.selector()) - handler = AsyncReceiver(self, source, prefetch=prefetch, loop=loop) + print("RECEIVER_PATH", source_url) + handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index b3b5138..072e04a 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -6,10 +6,11 @@ import asyncio from uamqp import errors, types -from uamqp import ReceiveClientAsync +from uamqp import ReceiveClientAsync, Source from azure.eventhub import EventHubError, EventData from azure.eventhub.receiver import Receiver +from azure.eventhub.common import _error_handler class AsyncReceiver(Receiver): @@ -17,7 +18,7 @@ class AsyncReceiver(Receiver): Implements the async API of a Receiver. """ - def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called + def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called """ Instantiate an async receiver. @@ -33,25 +34,32 @@ def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pyli :param loop: An event loop. """ self.loop = loop or asyncio.get_event_loop() + self.client = client + self.source = source + self.offset = offset + self.prefetch = prefetch + self.epoch = epoch + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None - self.debug = client.debug - self.offset = None - self.prefetch = prefetch self.properties = None - self.epoch = epoch + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) if epoch: self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} self._handler = ReceiveClientAsync( source, - auth=client.auth, - debug=self.debug, + auth=self.client.get_auth(), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, loop=self.loop) - async def open_async(self, connection): + async def open_async(self): """ Open the Receiver using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -61,15 +69,51 @@ async def open_async(self, connection): :type: connection: ~uamqp._async.connection_async.ConnectionAsync """ if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = ReceiveClientAsync( - self.redirected.address, - auth=None, - debug=self.debug, + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, loop=self.loop) - await self._handler.open_async(connection=connection) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() + + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + await self._handler.close_async() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClientAsync( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() async def has_started(self): """ @@ -131,25 +175,28 @@ async def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 message_batch = await self._handler.receive_message_batch_async( max_batch_size=max_batch_size, timeout=timeout_ms) - data_batch = [] for message in message_batch: event_data = EventData(message=message) self.offset = event_data.offset data_batch.append(event_data) return data_batch - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - await self.close_async(exception=error) - raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except (errors.MessageHandlerError): + await self.reconnect_async() + return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) await self.close_async(exception=error) diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index abcd0fa..3a10d6f 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -10,6 +10,7 @@ from azure.eventhub import EventHubError from azure.eventhub.sender import Sender +from azure.eventhub.common import _error_handler class AsyncSender(Sender): """ @@ -26,23 +27,28 @@ def __init__(self, client, target, partition=None, loop=None): # pylint: disabl :type target: str :param loop: An event loop. """ + self.loop = loop or asyncio.get_event_loop() + self.client = client + self.target = target + self.partition = partition + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.redirected = None self.error = None - self.debug = client.debug - self.partition = partition if partition: - target += "/Partitions/" + partition - self.loop = loop or asyncio.get_event_loop() + self.target += "/Partitions/" + partition self._handler = SendClientAsync( - target, - auth=client.auth, - debug=self.debug, + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), loop=self.loop) self._outcome = None self._condition = None - async def open_async(self, connection): + async def open_async(self): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -52,12 +58,44 @@ async def open_async(self, connection): :type: connection:~uamqp._async.connection_async.ConnectionAsync """ if self.redirected: + self.target = self.redirected.address + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = SendClientAsync( - self.redirected.address, - auth=None, - debug=self.debug, - msg_timeout=Sender.TIMEOUT) - await self._handler.open_async(connection=connection) + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + while not await self.has_started(): + await self._handler._connection.work_async() + + async def reconnect_async(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) + unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + await self._handler.close_async() + self._handler = SendClientAsync( + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + msg_timeout=Sender.TIMEOUT, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties(), + loop=self.loop) + await self._handler.open_async() + self._handler._pending_messages = unsent_events + await self._handler.wait_async() async def has_started(self): """ @@ -124,14 +162,15 @@ async def send(self, event_data): await self._handler.send_message_async(event_data.message) if self._outcome != constants.MessageSendResult.Ok: raise Sender._error(self._outcome, self._condition) - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - await self.close_async(exception=error) - raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except (errors.MessageHandlerError): + await self.reconnect_async() except Exception as e: error = EventHubError("Send failed: {}".format(e)) await self.close_async(exception=error) @@ -147,13 +186,14 @@ async def wait_async(self): raise self.error try: await self._handler.wait_async() - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - await self.close_async(exception=error) - raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - await self.close_async(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + await self.reconnect_async() + else: + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error + except (errors.MessageHandlerError): + await self.reconnect_async() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index d85df56..1659f70 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -8,6 +8,7 @@ import sys import uuid import time +import functools try: from urllib import urlparse, unquote_plus, urlencode, quote_plus except ImportError: @@ -16,7 +17,6 @@ import uamqp from uamqp import Connection from uamqp import Message -from uamqp import Source from uamqp import authentication from uamqp import constants @@ -108,7 +108,8 @@ def __init__(self, address, username=None, password=None, debug=False): """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self.address = urlparse(address) - self.mgmt_node = b"$management" + self.eh_name = self.address.path.lstrip('/') + self.mgmt_target = "amqps://{}/{}".format(self.address.hostname, self.eh_name) url_username = unquote_plus(self.address.username) if self.address.username else None username = username or url_username url_password = unquote_plus(self.address.password) if self.address.password else None @@ -117,8 +118,7 @@ def __init__(self, address, username=None, password=None, debug=False): raise ValueError("Missing username and/or password.") self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) self._auth_config = {'username': username, 'password': password} - self.auth = self._create_auth() - self.connection = None + self.get_auth = functools.partial(self._create_auth) self.debug = debug self.clients = [] @@ -148,10 +148,10 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) client._auth_config = { - 'username': policy, - 'password': key, - 'iot_username': username, - 'iot_password': password} # pylint: disable=protected-access + 'iot_username': policy, + 'iot_password': key, + 'username': username, + 'password': password} # pylint: disable=protected-access return client def _create_auth(self, username=None, password=None): @@ -172,7 +172,7 @@ def _create_auth(self, username=None, password=None): return authentication.SASLPlain(self.address.hostname, username, password) return authentication.SASTokenAuth.from_shared_access_key(self.auth_uri, username, password, timeout=60) - def _create_properties(self): # pylint: disable=no-self-use + def create_properties(self): # pylint: disable=no-self-use """ Format the properties with which to instantiate the connection. This acts like a user agent over HTTP. @@ -186,29 +186,6 @@ def _create_properties(self): # pylint: disable=no-self-use properties["platform"] = sys.platform return properties - def _create_connection(self): - """ - Create a new ~uamqp.connection.Connection instance that will be shared between all - Sender/Receiver clients. - """ - if not self.connection: - log.info("{}: Creating connection with address={}".format( - self.container_id, self.address.geturl())) - self.connection = Connection( - self.address.hostname, - self.auth, - container_id=self.container_id, - properties=self._create_properties(), - debug=self.debug) - - def _close_connection(self): - """ - Close and destroy the connection. - """ - if self.connection: - self.connection.destroy() - self.connection = None - def _close_clients(self): """ Close all open Sender/Receiver clients. @@ -219,25 +196,26 @@ def _close_clients(self): def _start_clients(self): for client in self.clients: try: - client.open(self.connection) - while not client.has_started(): - self.connection.work() + client.open() except Exception as exp: # pylint: disable=broad-except client.close(exception=exp) + def _process_redirect_uri(self, redirect): + redirect_uri = redirect.address.decode('utf-8') + auth_uri, _, _ = redirect_uri.partition("/ConsumerGroups") + self.address = urlparse(auth_uri) + self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path) + self.eh_name = self.address.path.lstrip('/') + self.mgmt_target = redirect_uri + def _handle_redirect(self, redirects): if len(redirects) != len(self.clients): raise EventHubError("Some clients are attempting to redirect the connection.") if not all(r.hostname == redirects[0].hostname for r in redirects): raise EventHubError("Multiple clients attempting to redirect to different hosts.") - self.auth_uri = redirects[0].address.decode('utf-8') - self.auth = self._create_auth() - new_target, _, _ = self.auth_uri.partition("/ConsumerGroups") - self.address = urlparse(new_target) - self.mgmt_node = new_target.encode('UTF-8') + b"/$management" - self.connection.redirect(redirects[0], self.auth) + self._process_redirect_uri(redirects[0]) for client in self.clients: - client.open(self.connection) + client.open() def run(self): """ @@ -252,7 +230,6 @@ def run(self): :rtype: list[~azure.eventhub.common.EventHubError] """ log.info("{}: Starting {} clients".format(self.container_id, len(self.clients))) - self._create_connection() try: self._start_clients() redirects = [c.redirected for c in self.clients if c.redirected] @@ -279,7 +256,6 @@ def stop(self): log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients))) self.stopped = True self._close_clients() - self._close_connection() def get_eventhub_info(self): """ @@ -293,13 +269,14 @@ def get_eventhub_info(self): :rtype: dict """ - eh_name = self.address.path.lstrip('/') - target = "amqps://{}/{}".format(self.address.hostname, eh_name) - mgmt_auth = self._create_auth() - mgmt_client = uamqp.AMQPClient(target, auth=mgmt_auth, debug=self.debug) + alt_creds = { + "username": self._auth_config.get("iot_username"), + "password":self._auth_config.get("iot_password")} try: + mgmt_auth = self._create_auth(**alt_creds) + mgmt_client = uamqp.AMQPClient(self.mgmt_target, auth=mgmt_auth, debug=self.debug) mgmt_client.open() - mgmt_msg = Message(application_properties={'name': eh_name}) + mgmt_msg = Message(application_properties={'name': self.eh_name}) response = mgmt_client.mgmt_request( mgmt_msg, constants.READ_OPERATION, @@ -340,10 +317,7 @@ def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, ope path = self.address.path + operation if operation else self.address.path source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - source = Source(source_url) - if offset is not None: - source.set_filter(offset.selector()) - handler = Receiver(self, source, prefetch=prefetch) + handler = Receiver(self, source_url, offset=offset, prefetch=prefetch) self.clients.append(handler) return handler diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index f14e778..a89eebc 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -10,6 +10,13 @@ from uamqp import types, constants, errors from uamqp.message import MessageHeader, MessageProperties +_NO_RETRY_ERRORS = ( + b"com.microsoft:argument-out-of-range", + b"com.microsoft:entity-disabled", + b"com.microsoft:auth-failed", + b"com.microsoft:precondition-failed", + b"com.microsoft:argument-error" +) def _error_handler(error): """ @@ -29,7 +36,9 @@ def _error_handler(error): elif error.condition == b'com.microsoft:operation-cancelled': return errors.ErrorAction(retry=True) elif error.condition == b"com.microsoft:container-close": - return errors.ErrorAction(retry=True) + return errors.ErrorAction(retry=True, backoff=4) + elif error.condition in _NO_RETRY_ERRORS: + return errors.ErrorAction(retry=False) return errors.ErrorAction(retry=True) @@ -97,7 +106,7 @@ def offset(self): :rtype: int """ try: - return self._annotations[EventData.PROP_OFFSET].decode('UTF-8') + return Offset(self._annotations[EventData.PROP_OFFSET].decode('UTF-8')) except (KeyError, AttributeError): return None diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index a21b30f..7e0c5e6 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -4,9 +4,9 @@ # -------------------------------------------------------------------------------------------- from uamqp import types, errors -from uamqp import ReceiveClient +from uamqp import ReceiveClient, Source -from azure.eventhub.common import EventHubError, EventData, Offset +from azure.eventhub.common import EventHubError, EventData, Offset, _error_handler class Receiver: @@ -16,38 +16,46 @@ class Receiver: timeout = 0 _epoch = b'com.microsoft:epoch' - def __init__(self, client, source, prefetch=300, epoch=None): + def __init__(self, client, source, offset=None, prefetch=300, epoch=None): """ Instantiate a receiver. :param client: The parent EventHubClient. :type client: ~azure.eventhub.client.EventHubClient :param source: The source EventHub from which to receive events. - :type source: ~uamqp.address.Source + :type source: str :param prefetch: The number of events to prefetch from the service for processing. Default is 300. :type prefetch: int :param epoch: An optional epoch value. :type epoch: int """ - self.offset = None + self.client = client + self.source = source + self.offset = offset self.prefetch = prefetch self.epoch = epoch + self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) self.properties = None self.redirected = None - self.debug = client.debug self.error = None + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) if epoch: self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))} self._handler = ReceiveClient( source, - auth=client.auth, - debug=self.debug, + auth=self.client.get_auth(), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, - timeout=self.timeout) + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) - def open(self, connection): + def open(self): """ Open the Receiver using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -57,14 +65,50 @@ def open(self, connection): :type: connection: ~uamqp.connection.Connection """ if self.redirected: + self.source = self.redirected.address + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = ReceiveClient( - self.redirected.address, - auth=None, - debug=self.debug, + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, prefetch=self.prefetch, link_properties=self.properties, - timeout=self.timeout) - self._handler.open(connection) + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() + + def reconnect(self): + """If the Receiver was disconnected from the service with + a retryable error - attempt to reconnect.""" + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} + self._handler.close() + source = Source(self.source) + if self.offset is not None: + source.set_filter(self.offset.selector()) + self._handler = ReceiveClient( + source, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, + prefetch=self.prefetch, + link_properties=self.properties, + timeout=self.timeout, + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() + while not self.has_started(): + self._handler._connection.work() def get_handler_state(self): """ @@ -146,34 +190,29 @@ def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 message_batch = self._handler.receive_message_batch( max_batch_size=max_batch_size, timeout=timeout_ms) - data_batch = [] for message in message_batch: event_data = EventData(message=message) self.offset = event_data.offset data_batch.append(event_data) return data_batch - except errors.LinkDetach as detach: - error = EventHubError(str(detach), detach) - self.close(exception=error) - raise error + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: + self.reconnect() + return data_batch + else: + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error + except (errors.MessageHandlerError): + self.reconnect() + return data_batch except Exception as e: error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) raise error - - def selector(self, default): - """ - Create a selector for the current offset if it is set. - - :param default: The fallback receive offset. - :type default: ~azure.eventhub.common.Offset - :rtype: ~azure.eventhub.common.Offset - """ - if self.offset is not None: - return Offset(self.offset).selector() - return default diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index f635da8..ccc69d8 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -27,24 +27,26 @@ def __init__(self, client, target, partition=None): :param target: The URI of the EventHub to send to. :type target: str """ - self.conneciton = None + self.client = client + self.target = target + self.partition = partition self.redirected = None self.error = None - self.debug = client.debug - self.partition = partition self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler) if partition: - target += "/Partitions/" + partition + self.target += "/Partitions/" + partition self._handler = SendClient( - target, - auth=client.auth, - debug=self.debug, + self.target, + auth=self.client.get_auth(), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, - error_policy=self.retry_policy) + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) self._outcome = None self._condition = None - def open(self, connection): + def open(self): #, connection): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect @@ -53,29 +55,41 @@ def open(self, connection): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ - self.connection = connection if self.redirected: + self.target = self.redirected.address + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler = SendClient( - self.redirected.address, - auth=None, - debug=self.debug, + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, - retry_policy=self.retry_policy) - self._handler.open(connection) + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() #connection) + while not self.has_started(): + self._handler._connection.work() def reconnect(self): """If the Sender was disconnected from the service with a retryable error - attempt to reconnect.""" pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] + alt_creds = { + "username": self.client._auth_config.get("iot_username"), + "password":self.client._auth_config.get("iot_password")} self._handler.close() self._handler = SendClient( - self.redirected.address, - auth=None, - debug=self.debug, + self.target, + auth=self.client.get_auth(**alt_creds), + debug=self.client.debug, msg_timeout=Sender.TIMEOUT, - retry_policy=self.retry_policy) - self._handler.open(self.connection) + error_policy=self.retry_policy, + keep_alive_interval=30, + properties=self.client.create_properties()) + self._handler.open() self._handler._pending_messages = unsent_events self._handler.wait() @@ -158,17 +172,15 @@ def send(self, event_data): error = EventHubError(str(failed), failed) self.close(exception=error) raise error - except errors.LinkDetach as detach: - if detach.action.retry: + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: self.reconnect() else: - error = EventHubError(str(detach), detach) + error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - self.close(exception=error) - raise error + except (errors.MessageHandlerError): + self.reconnect() except Exception as e: error = EventHubError("Send failed: {}".format(e)) self.close(exception=error) @@ -202,17 +214,15 @@ def wait(self): raise self.error try: self._handler.wait() - except errors.LinkDetach as detach: - if detach.action.retry: + except (errors.LinkDetach, errors.ConnectionClose) as shutdown: + if shutdown.action.retry: self.reconnect() else: - error = EventHubError(str(detach), detach) + error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except errors.ConnectionClose as close: - error = EventHubError(str(close), close) - self.close(exception=error) - raise error + except (errors.MessageHandlerError): + self.reconnect() except Exception as e: raise EventHubError("Send failed: {}".format(e)) From 35e1a67aaec4feda5a9503cb441e1a0a6c0b261b Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 12:29:50 -0700 Subject: [PATCH 08/11] Added HTTP proxy support Fix for issue #41 --- azure/eventhub/_async/__init__.py | 6 ++++-- azure/eventhub/client.py | 13 ++++++++++--- azure/eventprocessorhost/eh_partition_pump.py | 3 ++- azure/eventprocessorhost/eph.py | 1 + azure/eventprocessorhost/partition_manager.py | 3 ++- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 3041da5..16c6b77 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -55,8 +55,10 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self username = username or self._auth_config['username'] password = password or self._auth_config['password'] if "@sas.root" in username: - return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAsync.from_shared_access_key(self.auth_uri, username, password, timeout=60) + return authentication.SASLPlain( + self.address.hostname, username, password, http_proxy=self.http_proxy) + return authentication.SASTokenAsync.from_shared_access_key( + self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy) async def _close_clients_async(self): """ diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 1659f70..6ac4dde 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -89,7 +89,7 @@ class EventHubClient(object): events to and receiving events from the Azure Event Hubs service. """ - def __init__(self, address, username=None, password=None, debug=False): + def __init__(self, address, username=None, password=None, debug=False, http_proxy=None): """ Constructs a new EventHubClient with the given address URL. @@ -105,10 +105,15 @@ def __init__(self, address, username=None, password=None, debug=False): :param debug: Whether to output network trace logs to the logger. Default is `False`. :type debug: bool + :param http_proxy: HTTP proxy settings. This must be a dictionary with the following + keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). + Additionally the following keys may also be present: 'username', 'password'. + :type http_proxy: dict[str, Any] """ self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8] self.address = urlparse(address) self.eh_name = self.address.path.lstrip('/') + self.http_proxy = http_proxy self.mgmt_target = "amqps://{}/{}".format(self.address.hostname, self.eh_name) url_username = unquote_plus(self.address.username) if self.address.username else None username = username or url_username @@ -169,8 +174,10 @@ def _create_auth(self, username=None, password=None): username = username or self._auth_config['username'] password = password or self._auth_config['password'] if "@sas.root" in username: - return authentication.SASLPlain(self.address.hostname, username, password) - return authentication.SASTokenAuth.from_shared_access_key(self.auth_uri, username, password, timeout=60) + return authentication.SASLPlain( + self.address.hostname, username, password, http_proxy=self.http_proxy) + return authentication.SASTokenAuth.from_shared_access_key( + self.auth_uri, username, password, timeout=60, http_proxy=self.http_proxy) def create_properties(self): # pylint: disable=no-self-use """ diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 86c42d2..4801a25 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -66,7 +66,8 @@ async def open_clients_async(self): # Create event hub client and receive handler and set options self.eh_client = EventHubClientAsync( self.host.eh_config.client_address, - debug=self.host.eph_options.debug_trace) + debug=self.host.eph_options.debug_trace, + http_proxy=self.host.eph_options.http_proxy) self.partition_receive_handler = self.eh_client.add_async_receiver( self.partition_context.consumer_group_name, self.partition_context.partition_id, diff --git a/azure/eventprocessorhost/eph.py b/azure/eventprocessorhost/eph.py index 27cbc3e..7c7541e 100644 --- a/azure/eventprocessorhost/eph.py +++ b/azure/eventprocessorhost/eph.py @@ -73,3 +73,4 @@ def __init__(self): self.release_pump_on_timeout = False self.initial_offset_provider = "-1" self.debug_trace = False + self.http_proxy = None diff --git a/azure/eventprocessorhost/partition_manager.py b/azure/eventprocessorhost/partition_manager.py index da58423..7025fe0 100644 --- a/azure/eventprocessorhost/partition_manager.py +++ b/azure/eventprocessorhost/partition_manager.py @@ -38,7 +38,8 @@ async def get_partition_ids_async(self): try: eh_client = EventHubClientAsync( self.host.eh_config.client_address, - debug=self.host.eph_options.debug_trace) + debug=self.host.eph_options.debug_trace, + http_proxy=self.host.eph_options.http_proxy) try: eh_info = await eh_client.get_eventhub_info_async() self.partition_ids = eh_info['partition_ids'] From 0f5ddda3637706baa75f2f385259e1f50b142809 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 13:43:02 -0700 Subject: [PATCH 09/11] Fixed some tests + samples --- azure/eventhub/_async/sender_async.py | 10 ++-------- azure/eventhub/sender.py | 10 ++-------- azure/eventprocessorhost/partition_context.py | 4 ++-- examples/recv.py | 2 +- examples/recv_async.py | 2 +- examples/recv_batch.py | 2 +- tests/test_iothub_receive.py | 2 ++ tests/test_iothub_send.py | 1 - tests/test_receive.py | 4 ++++ 9 files changed, 15 insertions(+), 22 deletions(-) diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index 3a10d6f..cf7174e 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -59,12 +59,9 @@ async def open_async(self): """ if self.redirected: self.target = self.redirected.address - alt_creds = { - "username": self.client._auth_config.get("iot_username"), - "password":self.client._auth_config.get("iot_password")} self._handler = SendClientAsync( self.target, - auth=self.client.get_auth(**alt_creds), + auth=self.client.get_auth(), debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, @@ -80,13 +77,10 @@ async def reconnect_async(self): a retryable error - attempt to reconnect.""" pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] - alt_creds = { - "username": self.client._auth_config.get("iot_username"), - "password":self.client._auth_config.get("iot_password")} await self._handler.close_async() self._handler = SendClientAsync( self.target, - auth=self.client.get_auth(**alt_creds), + auth=self.client.get_auth(), debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index ccc69d8..e4455c4 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -57,12 +57,9 @@ def open(self): #, connection): """ if self.redirected: self.target = self.redirected.address - alt_creds = { - "username": self.client._auth_config.get("iot_username"), - "password":self.client._auth_config.get("iot_password")} self._handler = SendClient( self.target, - auth=self.client.get_auth(**alt_creds), + auth=self.client.get_auth(), debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, @@ -77,13 +74,10 @@ def reconnect(self): a retryable error - attempt to reconnect.""" pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] - alt_creds = { - "username": self.client._auth_config.get("iot_username"), - "password":self.client._auth_config.get("iot_password")} self._handler.close() self._handler = SendClient( self.target, - auth=self.client.get_auth(**alt_creds), + auth=self.client.get_auth(), debug=self.client.debug, msg_timeout=Sender.TIMEOUT, error_policy=self.retry_policy, diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index 9eaf53f..b21514b 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -34,7 +34,7 @@ def set_offset_and_sequence_number(self, event_data): """ if not event_data: raise Exception(event_data) - self.offset = event_data.offset + self.offset = event_data.offset.value self.sequence_number = event_data.sequence_number async def get_initial_offset_async(self): # throws InterruptedException, ExecutionException @@ -84,7 +84,7 @@ async def checkpoint_async_event_data(self, event_data): raise ValueError("Argument Out Of Range event_data x-opt-sequence-number") await self.persist_checkpoint_async(Checkpoint(self.partition_id, - event_data.offset, + event_data.offset.value, event_data.sequence_number)) def to_string(self): diff --git a/examples/recv.py b/examples/recv.py index 92a5df2..d2fbdf7 100644 --- a/examples/recv.py +++ b/examples/recv.py @@ -41,7 +41,7 @@ for event_data in receiver.receive(timeout=100): last_offset = event_data.offset last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) + print("Received: {}, {}".format(last_offset.value, last_sn)) total += 1 end_time = time.time() diff --git a/examples/recv_async.py b/examples/recv_async.py index ab8da39..04d9226 100644 --- a/examples/recv_async.py +++ b/examples/recv_async.py @@ -39,7 +39,7 @@ async def pump(client, partition): for event_data in await receiver.receive(timeout=10): last_offset = event_data.offset last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset, last_sn)) + print("Received: {}, {}".format(last_offset.value, last_sn)) total += 1 end_time = time.time() run_time = end_time - start_time diff --git a/examples/recv_batch.py b/examples/recv_batch.py index 7ce562d..9478f51 100644 --- a/examples/recv_batch.py +++ b/examples/recv_batch.py @@ -40,7 +40,7 @@ client.run() batched_events = receiver.receive(max_batch_size=10) for event_data in batched_events: - last_offset = event_data.offset + last_offset = event_data.offset.value last_sn = event_data.sequence_number total += 1 print("Partition {}, Received {}, sn={} offset={}".format( diff --git a/tests/test_iothub_receive.py b/tests/test_iothub_receive.py index a48274b..ced6858 100644 --- a/tests/test_iothub_receive.py +++ b/tests/test_iothub_receive.py @@ -16,6 +16,8 @@ def test_iothub_receive_sync(iot_connection_str, device_id): receiver = client.add_receiver("$default", "0", operation='/messages/events') try: client.run() + partitions = client.get_eventhub_info() + assert partitions["partition_ids"] == ["0", "1", "2", "3"] received = receiver.receive(timeout=5) assert len(received) == 0 finally: diff --git a/tests/test_iothub_send.py b/tests/test_iothub_send.py index 3f39c61..7c0dd7c 100644 --- a/tests/test_iothub_send.py +++ b/tests/test_iothub_send.py @@ -20,7 +20,6 @@ def test_iothub_send_single_event(iot_connection_str, device_id): sender = client.add_sender(operation='/messages/devicebound') try: client.run() - partitions = client.get_eventhub_info() outcome = sender.send(EventData(b"A single event", to_device=device_id)) assert outcome.value == 0 except: diff --git a/tests/test_receive.py b/tests/test_receive.py index fda5a96..1b7480e 100644 --- a/tests/test_receive.py +++ b/tests/test_receive.py @@ -33,9 +33,13 @@ def test_receive_end_of_stream(connection_str, senders): def test_receive_with_offset_sync(connection_str, senders): client = EventHubClient.from_connection_string(connection_str, debug=False) + partitions = client.get_eventhub_info() + assert partitions["partition_ids"] == ["0", "1"] receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) try: client.run() + more_partitions = client.get_eventhub_info() + assert more_partitions["partition_ids"] == ["0", "1"] received = receiver.receive(timeout=5) assert len(received) == 0 From 5130b2a39a5a26f46e7dc245d770da0d0c587b71 Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 13:54:41 -0700 Subject: [PATCH 10/11] pylint fixes --- azure/eventhub/_async/__init__.py | 4 +--- azure/eventhub/_async/receiver_async.py | 4 +++- azure/eventhub/_async/sender_async.py | 9 +++++---- azure/eventhub/client.py | 5 ++--- azure/eventhub/common.py | 4 ++-- azure/eventhub/receiver.py | 6 ++++-- azure/eventhub/sender.py | 12 +++++------- 7 files changed, 22 insertions(+), 22 deletions(-) diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 16c6b77..93a60c5 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -69,7 +69,7 @@ async def _close_clients_async(self): async def _wait_for_client(self, client): try: while client.get_handler_state().value == 2: - await client._handler._connection.work_async() + await client._handler._connection.work_async() # pylint: disable=protected-access except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) @@ -182,10 +182,8 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30 :rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync """ path = self.address.path + operation if operation else self.address.path - source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - print("RECEIVER_PATH", source_url) handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index 072e04a..6ba2c3a 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -68,6 +68,7 @@ async def open_async(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp._async.connection_async.ConnectionAsync """ + # pylint: disable=protected-access if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -93,6 +94,7 @@ async def open_async(self): async def reconnect_async(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access alt_creds = { "username": self.client._auth_config.get("iot_username"), "password":self.client._auth_config.get("iot_password")} @@ -194,7 +196,7 @@ async def receive(self, max_batch_size=None, timeout=None): error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: await self.reconnect_async() return data_batch except Exception as e: diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index cf7174e..42865f3 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -70,11 +70,12 @@ async def open_async(self): loop=self.loop) await self._handler.open_async() while not await self.has_started(): - await self._handler._connection.work_async() + await self._handler._connection.work_async() # pylint: disable=protected-access async def reconnect_async(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] await self._handler.close_async() @@ -130,7 +131,7 @@ async def close_async(self, exception=None): elif isinstance(exception, EventHubError): self.error = exception elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self.error = EventHubError(str(error), error) + self.error = EventHubError(str(exception), exception) elif exception: self.error = EventHubError(str(exception)) else: @@ -163,7 +164,7 @@ async def send(self, event_data): error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: await self.reconnect_async() except Exception as e: error = EventHubError("Send failed: {}".format(e)) @@ -187,7 +188,7 @@ async def wait_async(self): error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: await self.reconnect_async() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 6ac4dde..9c57cbd 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -15,7 +15,6 @@ from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus import uamqp -from uamqp import Connection from uamqp import Message from uamqp import authentication from uamqp import constants @@ -152,11 +151,11 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): username = "{}@sas.root.{}".format(policy, hub_name) password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) - client._auth_config = { + client._auth_config = { # pylint: disable=protected-access 'iot_username': policy, 'iot_password': key, 'username': username, - 'password': password} # pylint: disable=protected-access + 'password': password} return client def _create_auth(self, username=None, password=None): diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index a89eebc..035a812 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -257,7 +257,7 @@ def __init__(self, message, details=None): self._parse_error(details.description) for detail in self.details: self.message += "\n{}".format(detail) - except: + except: # pylint: disable=bare-except self.message += "\n{}".format(details) super(EventHubError, self).__init__(self.message) @@ -268,7 +268,7 @@ def _parse_error(self, error_list): if details_index >= 0: details_msg = self.message[details_index + 1:] self.message = self.message[0:details_index] - + tracking_index = details_msg.index(", TrackingId:") system_index = details_msg.index(", SystemTracker:") timestamp_index = details_msg.index(", Timestamp:") diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 7e0c5e6..49a15ce 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -6,7 +6,7 @@ from uamqp import types, errors from uamqp import ReceiveClient, Source -from azure.eventhub.common import EventHubError, EventData, Offset, _error_handler +from azure.eventhub.common import EventHubError, EventData, _error_handler class Receiver: @@ -64,6 +64,7 @@ def open(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ + # pylint: disable=protected-access if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -89,6 +90,7 @@ def open(self): def reconnect(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access alt_creds = { "username": self.client._auth_config.get("iot_username"), "password":self.client._auth_config.get("iot_password")} @@ -209,7 +211,7 @@ def receive(self, max_batch_size=None, timeout=None): error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: self.reconnect() return data_batch except Exception as e: diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index e4455c4..37d6024 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -3,9 +3,6 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import time - -import uamqp from uamqp import constants, errors from uamqp import SendClient @@ -65,13 +62,14 @@ def open(self): #, connection): error_policy=self.retry_policy, keep_alive_interval=30, properties=self.client.create_properties()) - self._handler.open() #connection) + self._handler.open() while not self.has_started(): - self._handler._connection.work() + self._handler._connection.work() # pylint: disable=protected-access def reconnect(self): """If the Sender was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] self._handler.close() @@ -173,7 +171,7 @@ def send(self, event_data): error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: self.reconnect() except Exception as e: error = EventHubError("Send failed: {}".format(e)) @@ -215,7 +213,7 @@ def wait(self): error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: self.reconnect() except Exception as e: raise EventHubError("Send failed: {}".format(e)) From f02c954a9cf731e451399af0955fc62660734d2f Mon Sep 17 00:00:00 2001 From: annatisch Date: Thu, 26 Jul 2018 14:56:16 -0700 Subject: [PATCH 11/11] bumped version --- HISTORY.rst | 13 +++++++++++++ azure/eventhub/__init__.py | 2 +- azure/eventhub/sender.py | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index c90d34f..89db3a3 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,19 @@ Release History =============== +0.2.0rc2 (2018-07-29) ++++++++++++++++++++++ + +- **Breaking change** `EventData.offset` will now return an object of type `~uamqp.common.Offset` rather than str. + The original string value can be retrieved from `~uamqp.common.Offset.value`. +- Each sender/receiver will now run in its own independent connection. +- Updated uAMQP dependency to 0.2.0 +- Fixed issue with IoTHub clients not being able to retrieve partition information. +- Added support for HTTP proxy settings to both EventHubClient and EPH. +- Added error handling policy to automatically reconnect on retryable error. +- Added keep-alive thread for maintaining an unused connection. + + 0.2.0rc1 (2018-07-06) +++++++++++++++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index 5182b38..5acadea 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "0.2.0rc1" +__version__ = "0.2.0rc2" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index 37d6024..ff358d0 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -43,7 +43,7 @@ def __init__(self, client, target, partition=None): self._outcome = None self._condition = None - def open(self): #, connection): + def open(self): """ Open the Sender using the supplied conneciton. If the handler has previously been redirected, the redirect