Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Updates for v1.3.0 (#91)
Browse files Browse the repository at this point in the history
* Added support for storing the state of the Event Processor along the
Checkpoint. Both Checkpoint and the EP state are stored as pickled
objects.

* Fixing pylint complaints.

* Switched from pickle back to JSON for lease persistence.

* Fixes bug when accessing leases that don't contain EP context. Also,
minor renaming.

* Better SAS token support

* Fixed pylint

* Improved auth error handling

* Test stabilization

* Improved stored EPH context

* Updated EPH context storing

* Skip test on OSX

* Skip tests on OSX

Fail due to large message body bug.

* Some cleanup

* Fixed error handling

* Improved SAS token parsing
  • Loading branch information
annatisch authored Jan 29, 2019
1 parent dbae147 commit ff197f3
Show file tree
Hide file tree
Showing 35 changed files with 824 additions and 183 deletions.
20 changes: 17 additions & 3 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,28 @@
Release History
===============

1.2.0
+++++
1.3.0 (2019-01-29)
++++++++++++++++++

**Bugfixes**

- Added support for auto reconnect on token expiration and other auth errors (issue #89).

**Features**

- Added ability to create ServiceBusClient from an existing SAS auth token, including
provding a function to auto-renew that token on expiry.
- Added support for storing a custom EPH context value in checkpoint (PR #84, thanks @konstantinmiller)


1.2.0 (2018-11-29)
++++++++++++++++++

- Support for Python 2.7 in azure.eventhub module (azure.eventprocessorhost will not support Python 2.7).
- Parse EventData.enqueued_time as a UTC timestamp (issue #72, thanks @vjrantal)


1.1.1 (2019-10-03)
1.1.1 (2018-10-03)
++++++++++++++++++

- Fixed bug in Azure namespace package.
Expand Down
2 changes: 1 addition & 1 deletion azure/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

__path__ = __import__('pkgutil').extend_path(__path__, __name__)
__path__ = __import__('pkgutil').extend_path(__path__, __name__)
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "1.2.0"
__version__ = "1.3.0"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
17 changes: 14 additions & 3 deletions azure/eventhub/async_ops/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
SendClientAsync,
ReceiveClientAsync)

from azure.eventhub.common import parse_sas_token
from azure.eventhub import (
Sender,
Receiver,
Expand All @@ -37,18 +38,28 @@ class EventHubClientAsync(EventHubClient):
sending events to and receiving events from the Azure Event Hubs service.
"""

def _create_auth(self, username=None, password=None): # pylint: disable=no-self-use
def _create_auth(self, username=None, password=None):
"""
Create an ~uamqp.authentication.cbs_auth_async.SASTokenAuthAsync instance to authenticate
the session.
:param auth_uri: The URI to authenticate against.
:type auth_uri: str
:param username: The name of the shared access policy.
:type username: str
:param password: The shared access key.
:type password: str
"""
if self.sas_token:
token = self.sas_token() if callable(self.sas_token) else self.sas_token
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAsync(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=self.auth_timeout,
http_proxy=self.http_proxy)

username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
Expand Down
20 changes: 19 additions & 1 deletion azure/eventhub/async_ops/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ async def open_async(self):
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)

async def reconnect_async(self):
async def reconnect_async(self): # pylint: disable=too-many-statements
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
# pylint: disable=protected-access
Expand All @@ -134,6 +134,11 @@ async def reconnect_async(self):
await self._handler.open_async()
while not await self._handler.client_ready_async():
await asyncio.sleep(0.05)
except errors.TokenExpired as shutdown:
log.info("AsyncReceiver disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
Expand All @@ -152,6 +157,15 @@ async def reconnect_async(self):
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncReceiver couldn't authenticate. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncReceiver connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Receiver reconnect failed: {}".format(e))
Expand Down Expand Up @@ -232,6 +246,10 @@ async def receive(self, max_batch_size=None, timeout=None):
self.offset = event_data.offset
data_batch.append(event_data)
return data_batch
except (errors.TokenExpired, errors.AuthenticationException):
log.info("AsyncReceiver disconnected due to token error. Attempting reconnect.")
await self.reconnect_async()
return data_batch
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
Expand Down
20 changes: 20 additions & 0 deletions azure/eventhub/async_ops/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ async def reconnect_async(self):
await self._handler.open_async()
self._handler.queue_message(*unsent_events)
await self._handler.wait_async()
except errors.TokenExpired as shutdown:
log.info("AsyncSender disconnected due to token expiry. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
Expand All @@ -137,6 +142,15 @@ async def reconnect_async(self):
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.AMQPConnectionError as shutdown:
if str(shutdown).startswith("Unable to open authentication session") and self.auto_reconnect:
log.info("AsyncSender couldn't authenticate. Attempting reconnect.")
await self.reconnect_async()
else:
log.info("AsyncSender connection error (%r). Shutting down.", e)
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except Exception as e:
log.info("Unexpected error occurred (%r). Shutting down.", e)
error = EventHubError("Sender reconnect failed: {}".format(e))
Expand Down Expand Up @@ -211,6 +225,9 @@ async def send(self, event_data):
await self._handler.send_message_async(event_data.message)
if self._outcome != constants.MessageSendResult.Ok:
raise Sender._error(self._outcome, self._condition)
except (errors.TokenExpired, errors.AuthenticationException):
log.info("AsyncSender disconnected due to token error. Attempting reconnect.")
await self.reconnect_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
Expand Down Expand Up @@ -247,6 +264,9 @@ async def wait_async(self):
raise ValueError("Unable to send until client has been started.")
try:
await self._handler.wait_async()
except (errors.TokenExpired, errors.AuthenticationException):
log.info("AsyncSender disconnected due to token error. Attempting reconnect.")
await self.reconnect_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncSender detached. Attempting reconnect.")
Expand Down
59 changes: 51 additions & 8 deletions azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
from azure.eventhub import __version__
from azure.eventhub.sender import Sender
from azure.eventhub.receiver import Receiver
from azure.eventhub.common import EventHubError
from azure.eventhub.common import EventHubError, parse_sas_token


log = logging.getLogger(__name__)

Expand Down Expand Up @@ -90,7 +91,9 @@ class EventHubClient(object):
events to and receiving events from the Azure Event Hubs service.
"""

def __init__(self, address, username=None, password=None, debug=False, http_proxy=None, auth_timeout=60):
def __init__(
self, address, username=None, password=None, debug=False,
http_proxy=None, auth_timeout=60, sas_token=None):
"""
Constructs a new EventHubClient with the given address URL.
Expand All @@ -113,8 +116,13 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox
:param auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:type auth_timeout: int
:param sas_token: A SAS token or function that returns a SAS token. If a function is supplied,
it will be used to retrieve subsequent tokens in the case of token expiry. The function should
take no arguments.
:type sas_token: str or callable
"""
self.container_id = "eventhub.pysdk-" + str(uuid.uuid4())[:8]
self.sas_token = sas_token
self.address = urlparse(address)
self.eh_name = self.address.path.lstrip('/')
self.http_proxy = http_proxy
Expand All @@ -123,8 +131,8 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox
username = username or url_username
url_password = unquote_plus(self.address.password) if self.address.password else None
password = password or url_password
if not username or not password:
raise ValueError("Missing username and/or password.")
if (not username or not password) and not sas_token:
raise ValueError("Please supply either username and password, or a SAS token")
self.auth_uri = "sb://{}{}".format(self.address.hostname, self.address.path)
self._auth_config = {'username': username, 'password': password}
self.get_auth = functools.partial(self._create_auth)
Expand All @@ -136,9 +144,34 @@ def __init__(self, address, username=None, password=None, debug=False, http_prox
log.info("%r: Created the Event Hub client", self.container_id)

@classmethod
def from_connection_string(cls, conn_str, eventhub=None, **kwargs):
def from_sas_token(cls, address, sas_token, eventhub=None, **kwargs):
"""Create an EventHubClient from an existing auth token or token generator.
:param address: The Event Hub address URL
:type address: str
:param sas_token: A SAS token or function that returns a SAS token. If a function is supplied,
it will be used to retrieve subsequent tokens in the case of token expiry. The function should
take no arguments.
:type sas_token: str or callable
:param eventhub: The name of the EventHub, if not already included in the address URL.
:type eventhub: str
:param debug: Whether to output network trace logs to the logger. Default
is `False`.
:type debug: bool
:param http_proxy: HTTP proxy settings. This must be a dictionary with the following
keys: 'proxy_hostname' (str value) and 'proxy_port' (int value).
Additionally the following keys may also be present: 'username', 'password'.
:type http_proxy: dict[str, Any]
:param auth_timeout: The time in seconds to wait for a token to be authorized by the service.
The default value is 60 seconds. If set to 0, no timeout will be enforced from the client.
:type auth_timeout: int
"""
Create an EventHubClient from a connection string.
address = _build_uri(address, eventhub)
return cls(address, sas_token=sas_token, **kwargs)

@classmethod
def from_connection_string(cls, conn_str, eventhub=None, **kwargs):
"""Create an EventHubClient from a connection string.
:param conn_str: The connection string.
:type conn_str: str
Expand Down Expand Up @@ -196,13 +229,23 @@ def _create_auth(self, username=None, password=None):
Create an ~uamqp.authentication.SASTokenAuth instance to authenticate
the session.
:param auth_uri: The URI to authenticate against.
:type auth_uri: str
:param username: The name of the shared access policy.
:type username: str
:param password: The shared access key.
:type password: str
"""
if self.sas_token:
token = self.sas_token() if callable(self.sas_token) else self.sas_token
try:
expiry = int(parse_sas_token(token)['se'])
except (KeyError, TypeError, IndexError):
raise ValueError("Supplied SAS token has no valid expiry value.")
return authentication.SASTokenAuth(
self.auth_uri, self.auth_uri, token,
expires_at=expiry,
timeout=self.auth_timeout,
http_proxy=self.http_proxy)

username = username or self._auth_config['username']
password = password or self._auth_config['password']
if "@sas.root" in username:
Expand Down
16 changes: 16 additions & 0 deletions azure/eventhub/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,22 @@ def _error_handler(error):
return errors.ErrorAction(retry=True)


def parse_sas_token(sas_token):
"""Parse a SAS token into its components.
:param sas_token: The SAS token.
:type sas_token: str
:rtype: dict[str, str]
"""
sas_data = {}
token = sas_token.partition(' ')[2]
fields = token.split('&')
for field in fields:
key, value = field.split('=', 1)
sas_data[key.lower()] = value
return sas_data


class EventData(object):
"""
The EventData class is a holder of event content.
Expand Down
Loading

0 comments on commit ff197f3

Please sign in to comment.