Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Merge pull request #51 from annatisch/eh_scenarios
Browse files Browse the repository at this point in the history
EPH improvements
  • Loading branch information
annatisch authored Aug 7, 2018
2 parents dd38016 + 0c27572 commit 97137ac
Show file tree
Hide file tree
Showing 16 changed files with 216 additions and 61 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,5 @@ ENV/
azure/mgmt/
azure/common/
azure/profiles/
azure/servicebus/
features/steps/mgmt_settings_real.py
16 changes: 16 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,22 @@
Release History
===============

0.2.0 (2018-08-06)
++++++++++++++++++

- Stability improvements for EPH.
- Updated uAMQP version.
- Added new configuration options for Sender and Receiver; `keep_alive` and `auto_reconnect`.
These flags have been added to the following:

- `EventHubClient.add_receiver`
- `EventHubClient.add_sender`
- `EventHubClientAsync.add_async_receiver`
- `EventHubClientAsync.add_async_sender`
- `EPHOptions.keey_alive_interval`
- `EPHOptions.auto_reconnect_on_error`


0.2.0rc2 (2018-07-29)
+++++++++++++++++++++

Expand Down
2 changes: 1 addition & 1 deletion azure/eventhub/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

__version__ = "0.2.0rc2"
__version__ = "0.2.0"

from azure.eventhub.common import EventData, EventHubError, Offset
from azure.eventhub.client import EventHubClient
Expand Down
23 changes: 17 additions & 6 deletions azure/eventhub/_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ async def _start_client_async(self, client):
try:
await client.open_async()
except Exception as exp: # pylint: disable=broad-except
log.info("Encountered error while starting handler: {}".format(exp))
await client.close_async(exception=exp)

async def _handle_redirect(self, redirects):
Expand Down Expand Up @@ -164,7 +165,9 @@ async def get_eventhub_info_async(self):
finally:
await mgmt_client.close_async()

def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None, loop=None):
def add_async_receiver(
self, consumer_group, partition, offset=None, prefetch=300,
operation=None, keep_alive=30, auto_reconnect=True, loop=None):
"""
Add an async receiver to the client for a particular consumer group and partition.
Expand All @@ -184,11 +187,15 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop)
handler = AsyncReceiver(
self, source_url, offset=offset, prefetch=prefetch,
keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop)
self.clients.append(handler)
return handler

def add_async_epoch_receiver(self, consumer_group, partition, epoch, prefetch=300, operation=None, loop=None):
def add_async_epoch_receiver(
self, consumer_group, partition, epoch, prefetch=300,
operation=None, keep_alive=30, auto_reconnect=True, loop=None):
"""
Add an async receiver to the client with an epoch value. Only a single epoch receiver
can connect to a partition at any given time - additional epoch receivers must have
Expand All @@ -211,11 +218,13 @@ def add_async_epoch_receiver(self, consumer_group, partition, epoch, prefetch=30
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
handler = AsyncReceiver(self, source_url, prefetch=prefetch, epoch=epoch, loop=loop)
handler = AsyncReceiver(
self, source_url, prefetch=prefetch, epoch=epoch,
keep_alive=keep_alive, auto_reconnect=auto_reconnect, loop=loop)
self.clients.append(handler)
return handler

def add_async_sender(self, partition=None, operation=None, loop=None):
def add_async_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True, loop=None):
"""
Add an async sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.
Expand All @@ -232,6 +241,8 @@ def add_async_sender(self, partition=None, operation=None, loop=None):
target = "amqps://{}{}".format(self.address.hostname, self.address.path)
if operation:
target = target + operation
handler = AsyncSender(self, target, partition=partition, loop=loop)
handler = AsyncSender(
self, target, partition=partition, keep_alive=keep_alive,
auto_reconnect=auto_reconnect, loop=loop)
self.clients.append(handler)
return handler
41 changes: 33 additions & 8 deletions azure/eventhub/_async/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
# --------------------------------------------------------------------------------------------

import asyncio
import uuid
import logging

from uamqp import errors, types
from uamqp import ReceiveClientAsync, Source
Expand All @@ -12,13 +14,17 @@
from azure.eventhub.receiver import Receiver
from azure.eventhub.common import _error_handler

log = logging.getLogger(__name__)


class AsyncReceiver(Receiver):
"""
Implements the async API of a Receiver.
"""

def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
def __init__( # pylint: disable=super-init-not-called
self, client, source, offset=None, prefetch=300, epoch=None,
keep_alive=None, auto_reconnect=True, loop=None):
"""
Instantiate an async receiver.
Expand All @@ -39,10 +45,14 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=N
self.offset = offset
self.prefetch = prefetch
self.epoch = epoch
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.redirected = None
self.error = None
self.properties = None
partition = self.source.split('/')[-1]
self.name = "EHReceiver-{}-partition{}".format(uuid.uuid4(), partition)
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
Expand All @@ -56,7 +66,9 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=N
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)

async def open_async(self):
Expand Down Expand Up @@ -85,7 +97,9 @@ async def open_async(self):
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
Expand All @@ -110,7 +124,8 @@ async def reconnect_async(self):
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
Expand Down Expand Up @@ -189,17 +204,27 @@ async def receive(self, max_batch_size=None, timeout=None):
data_batch.append(event_data)
return data_batch
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry:
if shutdown.action.retry and self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
return data_batch
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
log.info("AsyncReceiver detached. Attempting reconnect.")
await self.reconnect_async()
return data_batch
else:
log.info("AsyncReceiver detached. Shutting down.")
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError:
await self.reconnect_async()
return data_batch
except Exception as e:
log.info("Unexpected error occurred ({}). Shutting down.".format(e))
error = EventHubError("Receive failed: {}".format(e))
await self.close_async(exception=error)
raise error
38 changes: 28 additions & 10 deletions azure/eventhub/_async/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Licensed under the MIT License. See License.txt in the project root for license information.
# --------------------------------------------------------------------------------------------

import uuid
import asyncio

from uamqp import constants, errors
Expand All @@ -17,7 +18,7 @@ class AsyncSender(Sender):
Implements the async API of a Sender.
"""

def __init__(self, client, target, partition=None, loop=None): # pylint: disable=super-init-not-called
def __init__(self, client, target, partition=None, keep_alive=None, auto_reconnect=True, loop=None): # pylint: disable=super-init-not-called
"""
Instantiate an EventHub event SenderAsync handler.
Expand All @@ -31,18 +32,23 @@ def __init__(self, client, target, partition=None, loop=None): # pylint: disabl
self.client = client
self.target = target
self.partition = partition
self.keep_alive = keep_alive
self.auto_reconnect = auto_reconnect
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.name = "EHSender-{}".format(uuid.uuid4())
self.redirected = None
self.error = None
if partition:
self.target += "/Partitions/" + partition
self.name += "-partition{}".format(partition)
self._handler = SendClientAsync(
self.target,
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
keep_alive_interval=30,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
self._outcome = None
Expand All @@ -65,7 +71,8 @@ async def open_async(self):
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
keep_alive_interval=30,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
Expand All @@ -85,7 +92,8 @@ async def reconnect_async(self):
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
keep_alive_interval=30,
keep_alive_interval=self.keep_alive,
client_name=self.name,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
Expand Down Expand Up @@ -158,14 +166,19 @@ async def send(self, event_data):
if self._outcome != constants.MessageSendResult.Ok:
raise Sender._error(self._outcome, self._condition)
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry:
if shutdown.action.retry and self.auto_reconnect:
await self.reconnect_async()
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
await self.reconnect_async()
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError:
await self.reconnect_async()
except Exception as e:
error = EventHubError("Send failed: {}".format(e))
await self.close_async(exception=error)
Expand All @@ -182,13 +195,18 @@ async def wait_async(self):
try:
await self._handler.wait_async()
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry:
if shutdown.action.retry and self.auto_reconnect:
await self.reconnect_async()
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError as shutdown:
if self.auto_reconnect:
await self.reconnect_async()
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except errors.MessageHandlerError:
await self.reconnect_async()
except Exception as e:
raise EventHubError("Send failed: {}".format(e))
20 changes: 14 additions & 6 deletions azure/eventhub/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ def get_eventhub_info(self):
finally:
mgmt_client.close()

def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, operation=None):
def add_receiver(
self, consumer_group, partition, offset=None, prefetch=300,
operation=None, keep_alive=30, auto_reconnect=True):
"""
Add a receiver to the client for a particular consumer group and partition.
Expand All @@ -323,11 +325,15 @@ def add_receiver(self, consumer_group, partition, offset=None, prefetch=300, ope
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
handler = Receiver(self, source_url, offset=offset, prefetch=prefetch)
handler = Receiver(
self, source_url, offset=offset, prefetch=prefetch,
keep_alive=keep_alive, auto_reconnect=auto_reconnect)
self.clients.append(handler)
return handler

def add_epoch_receiver(self, consumer_group, partition, epoch, prefetch=300, operation=None):
def add_epoch_receiver(
self, consumer_group, partition, epoch, prefetch=300,
operation=None, keep_alive=30, auto_reconnect=True):
"""
Add a receiver to the client with an epoch value. Only a single epoch receiver
can connect to a partition at any given time - additional epoch receivers must have
Expand All @@ -350,11 +356,13 @@ def add_epoch_receiver(self, consumer_group, partition, epoch, prefetch=300, ope
path = self.address.path + operation if operation else self.address.path
source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
handler = Receiver(self, source_url, prefetch=prefetch, epoch=epoch)
handler = Receiver(
self, source_url, prefetch=prefetch, epoch=epoch,
keep_alive=keep_alive, auto_reconnect=auto_reconnect)
self.clients.append(handler)
return handler

def add_sender(self, partition=None, operation=None):
def add_sender(self, partition=None, operation=None, keep_alive=30, auto_reconnect=True):
"""
Add a sender to the client to send ~azure.eventhub.common.EventData object
to an EventHub.
Expand All @@ -371,6 +379,6 @@ def add_sender(self, partition=None, operation=None):
target = "amqps://{}{}".format(self.address.hostname, self.address.path)
if operation:
target = target + operation
handler = Sender(self, target, partition=partition)
handler = Sender(self, target, partition=partition, keep_alive=keep_alive, auto_reconnect=auto_reconnect)
self.clients.append(handler)
return handler
Loading

0 comments on commit 97137ac

Please sign in to comment.