Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Restructure for independent connections
Browse files Browse the repository at this point in the history
  • Loading branch information
annatisch committed Jul 26, 2018
1 parent 1ba74a5 commit 62c8e83
Show file tree
Hide file tree
Showing 7 changed files with 304 additions and 221 deletions.
62 changes: 13 additions & 49 deletions azure/eventhub/_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from uamqp import authentication, constants, types, errors
from uamqp import (
Message,
Source,
ConnectionAsync,
AMQPClientAsync,
SendClientAsync,
Expand Down Expand Up @@ -59,29 +58,6 @@ def _create_auth(self, username=None, password=None): # pylint: disable=no-self
return authentication.SASLPlain(self.address.hostname, username, password)
return authentication.SASTokenAsync.from_shared_access_key(self.auth_uri, username, password, timeout=60)

def _create_connection_async(self):
"""
Create a new ~uamqp._async.connection_async.ConnectionAsync instance that will be shared between all
AsyncSender/AsyncReceiver clients.
"""
if not self.connection:
log.info("{}: Creating connection with address={}".format(
self.container_id, self.address.geturl()))
self.connection = ConnectionAsync(
self.address.hostname,
self.auth,
container_id=self.container_id,
properties=self._create_properties(),
debug=self.debug)

async def _close_connection_async(self):
"""
Close and destroy the connection async.
"""
if self.connection:
await self.connection.destroy_async()
self.connection = None

async def _close_clients_async(self):
"""
Close all open AsyncSender/AsyncReceiver clients.
Expand All @@ -91,17 +67,13 @@ async def _close_clients_async(self):
async def _wait_for_client(self, client):
try:
while client.get_handler_state().value == 2:
await self.connection.work_async()
await client._handler._connection.work_async()
except Exception as exp: # pylint: disable=broad-except
await client.close_async(exception=exp)

async def _start_client_async(self, client):
try:
await client.open_async(self.connection)
started = await client.has_started()
while not started:
await self.connection.work_async()
started = await client.has_started()
await client.open_async()
except Exception as exp: # pylint: disable=broad-except
await client.close_async(exception=exp)

Expand All @@ -114,13 +86,8 @@ async def _handle_redirect(self, redirects):
redirects = [c.redirected for c in self.clients if c.redirected]
if not all(r.hostname == redirects[0].hostname for r in redirects):
raise EventHubError("Multiple clients attempting to redirect to different hosts.")
self.auth_uri = redirects[0].address.decode('utf-8')
self.auth = self._create_auth()
new_target, _, _ = self.auth_uri.partition("/ConsumerGroups")
self.address = urlparse(new_target)
self.mgmt_node = new_target.encode('UTF-8') + b"/$management"
await self.connection.redirect_async(redirects[0], self.auth)
await asyncio.gather(*[c.open_async(self.connection) for c in self.clients])
self._process_redirect_uri(redirects[0])
await asyncio.gather(*[c.open_async() for c in self.clients])

async def run_async(self):
"""
Expand All @@ -135,7 +102,6 @@ async def run_async(self):
:rtype: list[~azure.eventhub.common.EventHubError]
"""
log.info("{}: Starting {} clients".format(self.container_id, len(self.clients)))
self._create_connection_async()
tasks = [self._start_client_async(c) for c in self.clients]
try:
await asyncio.gather(*tasks)
Expand Down Expand Up @@ -163,26 +129,25 @@ async def stop_async(self):
log.info("{}: Stopping {} clients".format(self.container_id, len(self.clients)))
self.stopped = True
await self._close_clients_async()
await self._close_connection_async()

async def get_eventhub_info_async(self):
"""
Get details on the specified EventHub async.
:rtype: dict
"""
eh_name = self.address.path.lstrip('/')
target = "amqps://{}/{}".format(self.address.hostname, eh_name)
alt_creds = {
"username": self._auth_config.get("iot_username"),
"password":self._auth_config.get("iot_password")}
try:
mgmt_auth = self._create_auth()
mgmt_client = AMQPClientAsync(target, auth=mgmt_auth, debug=self.debug)
mgmt_auth = self._create_auth(**alt_creds)
mgmt_client = AMQPClientAsync(self.mgmt_target, auth=mgmt_auth, debug=self.debug)
await mgmt_client.open_async()
mgmt_msg = Message(application_properties={'name': eh_name})
mgmt_msg = Message(application_properties={'name': self.eh_name})
response = await mgmt_client.mgmt_request_async(
mgmt_msg,
constants.READ_OPERATION,
op_type=b'com.microsoft:eventhub',
node=self.mgmt_node,
status_code_field=b'status-code',
description_fields=b'status-description')
eh_info = response.get_data()
Expand Down Expand Up @@ -215,12 +180,11 @@ def add_async_receiver(self, consumer_group, partition, offset=None, prefetch=30
:rtype: ~azure.eventhub._async.receiver_async.ReceiverAsync
"""
path = self.address.path + operation if operation else self.address.path

source_url = "amqps://{}{}/ConsumerGroups/{}/Partitions/{}".format(
self.address.hostname, path, consumer_group, partition)
source = Source(source_url)
if offset is not None:
source.set_filter(offset.selector())
handler = AsyncReceiver(self, source, prefetch=prefetch, loop=loop)
print("RECEIVER_PATH", source_url)
handler = AsyncReceiver(self, source_url, offset=offset, prefetch=prefetch, loop=loop)
self.clients.append(handler)
return handler

Expand Down
91 changes: 69 additions & 22 deletions azure/eventhub/_async/receiver_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@
import asyncio

from uamqp import errors, types
from uamqp import ReceiveClientAsync
from uamqp import ReceiveClientAsync, Source

from azure.eventhub import EventHubError, EventData
from azure.eventhub.receiver import Receiver
from azure.eventhub.common import _error_handler


class AsyncReceiver(Receiver):
"""
Implements the async API of a Receiver.
"""

def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
def __init__(self, client, source, offset=None, prefetch=300, epoch=None, loop=None): # pylint: disable=super-init-not-called
"""
Instantiate an async receiver.
Expand All @@ -33,25 +34,32 @@ def __init__(self, client, source, prefetch=300, epoch=None, loop=None): # pyli
:param loop: An event loop.
"""
self.loop = loop or asyncio.get_event_loop()
self.client = client
self.source = source
self.offset = offset
self.prefetch = prefetch
self.epoch = epoch
self.retry_policy = errors.ErrorPolicy(max_retries=3, on_error=_error_handler)
self.redirected = None
self.error = None
self.debug = client.debug
self.offset = None
self.prefetch = prefetch
self.properties = None
self.epoch = epoch
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
if epoch:
self.properties = {types.AMQPSymbol(self._epoch): types.AMQPLong(int(epoch))}
self._handler = ReceiveClientAsync(
source,
auth=client.auth,
debug=self.debug,
auth=self.client.get_auth(),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
loop=self.loop)

async def open_async(self, connection):
async def open_async(self):
"""
Open the Receiver using the supplied conneciton.
If the handler has previously been redirected, the redirect
Expand All @@ -61,15 +69,51 @@ async def open_async(self, connection):
:type: connection: ~uamqp._async.connection_async.ConnectionAsync
"""
if self.redirected:
self.source = self.redirected.address
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler = ReceiveClientAsync(
self.redirected.address,
auth=None,
debug=self.debug,
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
loop=self.loop)
await self._handler.open_async(connection=connection)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()

async def reconnect_async(self):
"""If the Receiver was disconnected from the service with
a retryable error - attempt to reconnect."""
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
await self._handler.close_async()
source = Source(self.source)
if self.offset is not None:
source.set_filter(self.offset.selector())
self._handler = ReceiveClientAsync(
source,
auth=self.client.get_auth(**alt_creds),
debug=self.client.debug,
prefetch=self.prefetch,
link_properties=self.properties,
timeout=self.timeout,
error_policy=self.retry_policy,
keep_alive_interval=30,
properties=self.client.create_properties(),
loop=self.loop)
await self._handler.open_async()
while not await self.has_started():
await self._handler._connection.work_async()

async def has_started(self):
"""
Expand Down Expand Up @@ -131,25 +175,28 @@ async def receive(self, max_batch_size=None, timeout=None):
"""
if self.error:
raise self.error
data_batch = []
try:
timeout_ms = 1000 * timeout if timeout else 0
message_batch = await self._handler.receive_message_batch_async(
max_batch_size=max_batch_size,
timeout=timeout_ms)
data_batch = []
for message in message_batch:
event_data = EventData(message=message)
self.offset = event_data.offset
data_batch.append(event_data)
return data_batch
except errors.LinkDetach as detach:
error = EventHubError(str(detach), detach)
await self.close_async(exception=error)
raise error
except errors.ConnectionClose as close:
error = EventHubError(str(close), close)
await self.close_async(exception=error)
raise error
except (errors.LinkDetach, errors.ConnectionClose) as shutdown:
if shutdown.action.retry:
await self.reconnect_async()
return data_batch
else:
error = EventHubError(str(shutdown), shutdown)
await self.close_async(exception=error)
raise error
except (errors.MessageHandlerError):
await self.reconnect_async()
return data_batch
except Exception as e:
error = EventHubError("Receive failed: {}".format(e))
await self.close_async(exception=error)
Expand Down
Loading

0 comments on commit 62c8e83

Please sign in to comment.