diff --git a/azure/eventhub/_async/__init__.py b/azure/eventhub/_async/__init__.py index 16c6b77..93a60c5 100644 --- a/azure/eventhub/_async/__init__.py +++ b/azure/eventhub/_async/__init__.py @@ -69,7 +69,7 @@ async def _close_clients_async(self): async def _wait_for_client(self, client): try: while client.get_handler_state().value == 2: - await client._handler._connection.work_async() + await client._handler._connection.work_async() # pylint: disable=protected-access except Exception as exp: # pylint: disable=broad-except await client.close_async(exception=exp) @@ -182,10 +182,8 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30 :rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync """ path = self.address.path + operation if operation else self.address.path - source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format( self.address.hostname, path, consumer_group, partition) - print("RECEIVER_PATH", source_url) handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop) self.clients.append(handler) return handler diff --git a/azure/eventhub/_async/receiver_async.py b/azure/eventhub/_async/receiver_async.py index 072e04a..6ba2c3a 100644 --- a/azure/eventhub/_async/receiver_async.py +++ b/azure/eventhub/_async/receiver_async.py @@ -68,6 +68,7 @@ async def open_async(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp._async.connection_async.ConnectionAsync """ + # pylint: disable=protected-access if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -93,6 +94,7 @@ async def open_async(self): async def reconnect_async(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access alt_creds = { "username": self.client._auth_config.get("iot_username"), "password":self.client._auth_config.get("iot_password")} @@ -194,7 +196,7 @@ async def receive(self, max_batch_size=None, timeout=None): error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: await self.reconnect_async() return data_batch except Exception as e: diff --git a/azure/eventhub/_async/sender_async.py b/azure/eventhub/_async/sender_async.py index cf7174e..42865f3 100644 --- a/azure/eventhub/_async/sender_async.py +++ b/azure/eventhub/_async/sender_async.py @@ -70,11 +70,12 @@ async def open_async(self): loop=self.loop) await self._handler.open_async() while not await self.has_started(): - await self._handler._connection.work_async() + await self._handler._connection.work_async() # pylint: disable=protected-access async def reconnect_async(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] await self._handler.close_async() @@ -130,7 +131,7 @@ async def close_async(self, exception=None): elif isinstance(exception, EventHubError): self.error = exception elif isinstance(exception, (errors.LinkDetach, errors.ConnectionClose)): - self.error = EventHubError(str(error), error) + self.error = EventHubError(str(exception), exception) elif exception: self.error = EventHubError(str(exception)) else: @@ -163,7 +164,7 @@ async def send(self, event_data): error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: await self.reconnect_async() except Exception as e: error = EventHubError("Send failed: {}".format(e)) @@ -187,7 +188,7 @@ async def wait_async(self): error = EventHubError(str(shutdown), shutdown) await self.close_async(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: await self.reconnect_async() except Exception as e: raise EventHubError("Send failed: {}".format(e)) diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 6ac4dde..9c57cbd 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -15,7 +15,6 @@ from urllib.parse import urlparse, unquote_plus, urlencode, quote_plus import uamqp -from uamqp import Connection from uamqp import Message from uamqp import authentication from uamqp import constants @@ -152,11 +151,11 @@ def from_iothub_connection_string(cls, conn_str, **kwargs): username = "{}@sas.root.{}".format(policy, hub_name) password = _generate_sas_token(address, policy, key) client = cls("amqps://" + address, username=username, password=password, **kwargs) - client._auth_config = { + client._auth_config = { # pylint: disable=protected-access 'iot_username': policy, 'iot_password': key, 'username': username, - 'password': password} # pylint: disable=protected-access + 'password': password} return client def _create_auth(self, username=None, password=None): diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index a89eebc..035a812 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -257,7 +257,7 @@ def __init__(self, message, details=None): self._parse_error(details.description) for detail in self.details: self.message += "\n{}".format(detail) - except: + except: # pylint: disable=bare-except self.message += "\n{}".format(details) super(EventHubError, self).__init__(self.message) @@ -268,7 +268,7 @@ def _parse_error(self, error_list): if details_index >= 0: details_msg = self.message[details_index + 1:] self.message = self.message[0:details_index] - + tracking_index = details_msg.index(", TrackingId:") system_index = details_msg.index(", SystemTracker:") timestamp_index = details_msg.index(", Timestamp:") diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 7e0c5e6..49a15ce 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -6,7 +6,7 @@ from uamqp import types, errors from uamqp import ReceiveClient, Source -from azure.eventhub.common import EventHubError, EventData, Offset, _error_handler +from azure.eventhub.common import EventHubError, EventData, _error_handler class Receiver: @@ -64,6 +64,7 @@ def open(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ + # pylint: disable=protected-access if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -89,6 +90,7 @@ def open(self): def reconnect(self): """If the Receiver was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access alt_creds = { "username": self.client._auth_config.get("iot_username"), "password":self.client._auth_config.get("iot_password")} @@ -209,7 +211,7 @@ def receive(self, max_batch_size=None, timeout=None): error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: self.reconnect() return data_batch except Exception as e: diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index e4455c4..37d6024 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -3,9 +3,6 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -import time - -import uamqp from uamqp import constants, errors from uamqp import SendClient @@ -65,13 +62,14 @@ def open(self): #, connection): error_policy=self.retry_policy, keep_alive_interval=30, properties=self.client.create_properties()) - self._handler.open() #connection) + self._handler.open() while not self.has_started(): - self._handler._connection.work() + self._handler._connection.work() # pylint: disable=protected-access def reconnect(self): """If the Sender was disconnected from the service with a retryable error - attempt to reconnect.""" + # pylint: disable=protected-access pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent) unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states] self._handler.close() @@ -173,7 +171,7 @@ def send(self, event_data): error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: self.reconnect() except Exception as e: error = EventHubError("Send failed: {}".format(e)) @@ -215,7 +213,7 @@ def wait(self): error = EventHubError(str(shutdown), shutdown) self.close(exception=error) raise error - except (errors.MessageHandlerError): + except errors.MessageHandlerError: self.reconnect() except Exception as e: raise EventHubError("Send failed: {}".format(e))