diff --git a/.gitignore b/.gitignore index 6b8bf2d..0785980 100644 --- a/.gitignore +++ b/.gitignore @@ -39,6 +39,9 @@ pip-delete-this-directory.txt azure/storage/ azure/common/ azure/profiles/ +*.log.1 +*.log.2 +*.log.3 htmlcov/ .tox/ diff --git a/.travis.yml b/.travis.yml index 17f7c2e..6addbf5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,8 +1,15 @@ language: python cache: pip -python: - - "3.6" -# command to install dependencies +dist: xenial +sudo: required +matrix: + include: + - os: linux + python: "3.5" + - os: linux + python: "3.6" + - os: linux + python: "3.7" install: - pip install -r dev_requirements.txt - pip install -e . diff --git a/HISTORY.rst b/HISTORY.rst index 6e90e55..c77b3c7 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,21 @@ Release History =============== +1.1.0 (2018-09-21) +++++++++++++++++++ + +- Changes to `AzureStorageCheckpointLeaseManager` parameters to support other connection options (issue #61): + + - The `storage_account_name`, `storage_account_key` and `lease_container_name` arguments are now optional keyword arguments. + - Added a `sas_token` argument that must be specified with `storage_account_name` in place of `storage_account_key`. + - Added an `endpoint_suffix` argument to support storage endpoints in National Clouds. + - Added a `connection_string` argument that, if specified, overrides all other endpoint arguments. + - The `lease_container_name` argument now defaults to `"eph-leases"` if not specified. + +- Fix for clients failing to start if run called multipled times (issue #64). +- Added convenience methods `body_as_str` and `body_as_json` to EventData object for easier processing of message data. + + 1.0.0 (2018-08-22) ++++++++++++++++++ diff --git a/azure/eventhub/__init__.py b/azure/eventhub/__init__.py index 3cde06c..e07a603 100644 --- a/azure/eventhub/__init__.py +++ b/azure/eventhub/__init__.py @@ -3,7 +3,7 @@ # Licensed under the MIT License. See License.txt in the project root for license information. # -------------------------------------------------------------------------------------------- -__version__ = "1.0.0" +__version__ = "1.1.0" from azure.eventhub.common import EventData, EventHubError, Offset from azure.eventhub.client import EventHubClient diff --git a/azure/eventhub/async_ops/__init__.py b/azure/eventhub/async_ops/__init__.py index 7774724..7c62b3d 100644 --- a/azure/eventhub/async_ops/__init__.py +++ b/azure/eventhub/async_ops/__init__.py @@ -75,7 +75,8 @@ async def _wait_for_client(self, client): async def _start_client_async(self, client): try: - await client.open_async() + if not client.running: + await client.open_async() except Exception as exp: # pylint: disable=broad-except log.info("Encountered error while starting handler: %r", exp) await client.close_async(exception=exp) diff --git a/azure/eventhub/async_ops/receiver_async.py b/azure/eventhub/async_ops/receiver_async.py index ad04520..814adff 100644 --- a/azure/eventhub/async_ops/receiver_async.py +++ b/azure/eventhub/async_ops/receiver_async.py @@ -40,6 +40,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. """ self.loop = loop or asyncio.get_event_loop() + self.running = False self.client = client self.source = source self.offset = offset @@ -81,6 +82,7 @@ async def open_async(self): :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync """ # pylint: disable=protected-access + self.running = True if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -171,12 +173,11 @@ async def has_started(self): timeout, auth_in_progress = await self._handler._auth.handle_token_async() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not await self._handler._client_ready_async(): + if not await self._handler._client_ready_async(): return False - else: - return True + return True async def close_async(self, exception=None): """ @@ -188,9 +189,10 @@ async def close_async(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception @@ -216,6 +218,8 @@ async def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to receive until client has been started.") data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 @@ -232,21 +236,19 @@ async def receive(self, max_batch_size=None, timeout=None): log.info("AsyncReceiver detached. Attempting reconnect.") await self.reconnect_async() return data_batch - else: - log.info("AsyncReceiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: log.info("AsyncReceiver detached. Attempting reconnect.") await self.reconnect_async() return data_batch - else: - log.info("AsyncReceiver detached. Shutting down.") - error = EventHubError(str(shutdown), shutdown) - await self.close_async(exception=error) - raise error + log.info("AsyncReceiver detached. Shutting down.") + error = EventHubError(str(shutdown), shutdown) + await self.close_async(exception=error) + raise error except Exception as e: log.info("Unexpected error occurred (%r). Shutting down.", e) error = EventHubError("Receive failed: {}".format(e)) diff --git a/azure/eventhub/async_ops/sender_async.py b/azure/eventhub/async_ops/sender_async.py index 098c026..9f46fdd 100644 --- a/azure/eventhub/async_ops/sender_async.py +++ b/azure/eventhub/async_ops/sender_async.py @@ -47,6 +47,7 @@ def __init__( # pylint: disable=super-init-not-called :param loop: An event loop. If not specified the default event loop will be used. """ self.loop = loop or asyncio.get_event_loop() + self.running = False self.client = client self.target = target self.partition = partition @@ -82,6 +83,7 @@ async def open_async(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp.async_ops.connection_async.ConnectionAsync """ + self.running = True if self.redirected: self.target = self.redirected.address self._handler = SendClientAsync( @@ -156,12 +158,11 @@ async def has_started(self): timeout, auth_in_progress = await self._handler._auth.handle_token_async() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not await self._handler._client_ready_async(): + if not await self._handler._client_ready_async(): return False - else: - return True + return True async def close_async(self, exception=None): """ @@ -173,9 +174,10 @@ async def close_async(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception @@ -199,6 +201,8 @@ async def send(self, event_data): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") event_data.message.on_send_complete = self._on_outcome @@ -238,6 +242,8 @@ async def wait_async(self): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") try: await self._handler.wait_async() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: diff --git a/azure/eventhub/client.py b/azure/eventhub/client.py index 43c3b65..06508df 100644 --- a/azure/eventhub/client.py +++ b/azure/eventhub/client.py @@ -233,7 +233,8 @@ def _close_clients(self): def _start_clients(self): for client in self.clients: try: - client.open() + if not client.running: + client.open() except Exception as exp: # pylint: disable=broad-except client.close(exception=exp) @@ -329,8 +330,6 @@ def get_eventhub_info(self): output['partition_count'] = eh_info[b'partition_count'] output['partition_ids'] = [p.decode('utf-8') for p in eh_info[b'partition_ids']] return output - except: - raise finally: mgmt_client.close() diff --git a/azure/eventhub/common.py b/azure/eventhub/common.py index af4db4e..b4a1755 100644 --- a/azure/eventhub/common.py +++ b/azure/eventhub/common.py @@ -5,6 +5,7 @@ import datetime import time +import json from uamqp import Message, BatchMessage from uamqp import types, constants, errors @@ -31,13 +32,13 @@ def _error_handler(error): """ if error.condition == b'com.microsoft:server-busy': return errors.ErrorAction(retry=True, backoff=4) - elif error.condition == b'com.microsoft:timeout': + if error.condition == b'com.microsoft:timeout': return errors.ErrorAction(retry=True, backoff=2) - elif error.condition == b'com.microsoft:operation-cancelled': + if error.condition == b'com.microsoft:operation-cancelled': return errors.ErrorAction(retry=True) - elif error.condition == b"com.microsoft:container-close": + if error.condition == b"com.microsoft:container-close": return errors.ErrorAction(retry=True, backoff=4) - elif error.condition in _NO_RETRY_ERRORS: + if error.condition in _NO_RETRY_ERRORS: return errors.ErrorAction(retry=False) return errors.ErrorAction(retry=True) @@ -88,7 +89,6 @@ def __init__(self, body=None, batch=None, to_device=None, message=None): else: self.message = Message(body, properties=self.msg_properties) - @property def sequence_number(self): """ @@ -188,7 +188,45 @@ def body(self): :rtype: bytes or Generator[bytes] """ - return self.message.get_data() + try: + return self.message.get_data() + except TypeError: + raise ValueError("Message data empty.") + + def body_as_str(self, encoding='UTF-8'): + """ + The body of the event data as a string if the data is of a + compatible type. + + :param encoding: The encoding to use for decoding message data. + Default is 'UTF-8' + :rtype: str + """ + data = self.body + try: + return "".join(b.decode(encoding) for b in data) + except TypeError: + return str(data) + except: # pylint: disable=bare-except + pass + try: + return data.decode(encoding) + except Exception as e: + raise TypeError("Message data is not compatible with string type: {}".format(e)) + + def body_as_json(self, encoding='UTF-8'): + """ + The body of the event loaded as a JSON object is the data is compatible. + + :param encoding: The encoding to use for decoding message data. + Default is 'UTF-8' + :rtype: dict + """ + data_str = self.body_as_str(encoding=encoding) + try: + return json.loads(data_str) + except Exception as e: + raise TypeError("Event data is not compatible with JSON type: {}".format(e)) class Offset(object): @@ -231,7 +269,7 @@ def selector(self): if isinstance(self.value, datetime.datetime): timestamp = (time.mktime(self.value.timetuple()) * 1000) + (self.value.microsecond/1000) return ("amqp.annotation.x-opt-enqueued-time {} '{}'".format(operator, int(timestamp))).encode('utf-8') - elif isinstance(self.value, int): + if isinstance(self.value, int): return ("amqp.annotation.x-opt-sequence-number {} '{}'".format(operator, self.value)).encode('utf-8') return ("amqp.annotation.x-opt-offset {} '{}'".format(operator, self.value)).encode('utf-8') diff --git a/azure/eventhub/receiver.py b/azure/eventhub/receiver.py index 0b7b8a9..6822149 100644 --- a/azure/eventhub/receiver.py +++ b/azure/eventhub/receiver.py @@ -35,6 +35,7 @@ def __init__(self, client, source, offset=None, prefetch=300, epoch=None, keep_a :param epoch: An optional epoch value. :type epoch: int """ + self.running = False self.client = client self.source = source self.offset = offset @@ -75,6 +76,7 @@ def open(self): :type: connection: ~uamqp.connection.Connection """ # pylint: disable=protected-access + self.running = True if self.redirected: self.source = self.redirected.address source = Source(self.source) @@ -168,12 +170,11 @@ def has_started(self): timeout, auth_in_progress = self._handler._auth.handle_token() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not self._handler._client_ready(): + if not self._handler._client_ready(): return False - else: - return True + return True def close(self, exception=None): """ @@ -185,9 +186,10 @@ def close(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception @@ -223,6 +225,8 @@ def receive(self, max_batch_size=None, timeout=None): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to receive until client has been started.") data_batch = [] try: timeout_ms = 1000 * timeout if timeout else 0 @@ -238,18 +242,16 @@ def receive(self, max_batch_size=None, timeout=None): if shutdown.action.retry and self.auto_reconnect: self.reconnect() return data_batch - else: - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except errors.MessageHandlerError as shutdown: if self.auto_reconnect: self.reconnect() return data_batch - else: - error = EventHubError(str(shutdown), shutdown) - self.close(exception=error) - raise error + error = EventHubError(str(shutdown), shutdown) + self.close(exception=error) + raise error except Exception as e: error = EventHubError("Receive failed: {}".format(e)) self.close(exception=error) diff --git a/azure/eventhub/sender.py b/azure/eventhub/sender.py index b7fef5e..b4ed3b7 100644 --- a/azure/eventhub/sender.py +++ b/azure/eventhub/sender.py @@ -40,6 +40,7 @@ def __init__(self, client, target, partition=None, send_timeout=60, keep_alive=N Default value is `True`. :type auto_reconnect: bool """ + self.running = False self.client = client self.target = target self.partition = partition @@ -74,6 +75,7 @@ def open(self): :param connection: The underlying client shared connection. :type: connection: ~uamqp.connection.Connection """ + self.running = True if self.redirected: self.target = self.redirected.address self._handler = SendClient( @@ -152,12 +154,11 @@ def has_started(self): timeout, auth_in_progress = self._handler._auth.handle_token() if timeout: raise EventHubError("Authorization timeout.") - elif auth_in_progress: + if auth_in_progress: return False - elif not self._handler._client_ready(): + if not self._handler._client_ready(): return False - else: - return True + return True def close(self, exception=None): """ @@ -169,9 +170,10 @@ def close(self, exception=None): due to an error. :type exception: Exception """ + self.running = False if self.error: return - elif isinstance(exception, errors.LinkRedirect): + if isinstance(exception, errors.LinkRedirect): self.redirected = exception elif isinstance(exception, EventHubError): self.error = exception @@ -195,6 +197,8 @@ def send(self, event_data): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") event_data.message.on_send_complete = self._on_outcome @@ -239,6 +243,8 @@ def transfer(self, event_data, callback=None): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") if event_data.partition_key and self.partition: raise ValueError("EventData partition key cannot be used with a partition sender.") if callback: @@ -251,6 +257,8 @@ def wait(self): """ if self.error: raise self.error + if not self.running: + raise ValueError("Unable to send until client has been started.") try: self._handler.wait() except (errors.LinkDetach, errors.ConnectionClose) as shutdown: diff --git a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py index 8ac3abe..a749bf2 100644 --- a/azure/eventprocessorhost/azure_storage_checkpoint_manager.py +++ b/azure/eventprocessorhost/azure_storage_checkpoint_manager.py @@ -28,14 +28,39 @@ class AzureStorageCheckpointLeaseManager(AbstractCheckpointManager, AbstractLeas Manages checkpoints and lease with azure storage blobs. In this implementation, checkpoints are data that's actually in the lease blob, so checkpoint operations turn into lease operations under the covers. + + :param str storage_account_name: The storage account name. This is used to + authenticate requests signed with an account key and to construct the storage + endpoint. It is required unless a connection string is given. + :param str storage_account_key: The storage account key. This is used for shared key + authentication. If neither account key or sas token is specified, anonymous access + will be used. + :param str lease_container_name: The name of the container that will be used to store + leases. If it does not already exist it will be created. Default value is 'eph-leases'. + :param int lease_renew_interval: The interval in seconds at which EPH will attempt to + renew the lease of a particular partition. Default value is 10. + :param int lease_duration: The duration in seconds of a lease on a partition. + Default value is 30. + :param str sas_token: A shared access signature token to use to authenticate requests + instead of the account key. If account key and sas token are both specified, + account key will be used to sign. If neither are specified, anonymous access will be used. + :param str endpoint_suffix: The host base component of the url, minus the account name. + Defaults to Azure (core.windows.net). Override this to use a National Cloud. + :param str connection_string: If specified, this will override all other endpoint parameters. + See http://azure.microsoft.com/en-us/documentation/articles/storage-configure-connection-string/ + for the connection string format. """ - def __init__(self, storage_account_name, storage_account_key, lease_container_name, - storage_blob_prefix=None, lease_renew_interval=10, lease_duration=30): + def __init__(self, storage_account_name=None, storage_account_key=None, lease_container_name="eph-leases", + storage_blob_prefix=None, lease_renew_interval=10, lease_duration=30, + sas_token=None, endpoint_suffix="core.windows.net", connection_string=None): AbstractCheckpointManager.__init__(self) AbstractLeaseManager.__init__(self, lease_renew_interval, lease_duration) self.storage_account_name = storage_account_name self.storage_account_key = storage_account_key + self.storage_sas_token = sas_token + self.endpoint_suffix = endpoint_suffix + self.connection_string = connection_string self.lease_container_name = lease_container_name self.storage_blob_prefix = storage_blob_prefix self.storage_client = None @@ -47,8 +72,8 @@ def __init__(self, storage_account_name, storage_account_key, lease_container_na self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=32) # Validate storage inputs - if not self.storage_account_name or not self.storage_account_key: - raise ValueError("Need a valid storage account name and key") + if not self.storage_account_name and not self.connection_string: + raise ValueError("Need a valid storage account name or connection string.") if not re.compile(r"^[a-z0-9](([a-z0-9\-[^\-])){1,61}[a-z0-9]$").match(self.lease_container_name): raise ValueError("Azure Storage lease container name is invalid.\ Please check naming conventions at\ @@ -68,6 +93,9 @@ def initialize(self, host): self.host = host self.storage_client = BlockBlobService(account_name=self.storage_account_name, account_key=self.storage_account_key, + sas_token=self.storage_sas_token, + endpoint_suffix=self.endpoint_suffix, + connection_string=self.connection_string, request_session=self.request_session) self.consumer_group_directory = self.storage_blob_prefix + self.host.eh_config.consumer_group diff --git a/azure/eventprocessorhost/eh_partition_pump.py b/azure/eventprocessorhost/eh_partition_pump.py index 4ebd6a9..e0aa25d 100644 --- a/azure/eventprocessorhost/eh_partition_pump.py +++ b/azure/eventprocessorhost/eh_partition_pump.py @@ -36,8 +36,8 @@ async def on_open_async(self): _opened_ok = True except Exception as err: # pylint: disable=broad-except _logger.warning( - "%r,%r PartitionPumpWarning: Failure creating client or receiver, " + - "retrying: %r", self.host.guid, self.partition_context.partition_id, err) + "%r,%r PartitionPumpWarning: Failure creating client or receiver, retrying: %r", + self.host.guid, self.partition_context.partition_id, err) last_exception = err _retry_count += 1 diff --git a/azure/eventprocessorhost/partition_context.py b/azure/eventprocessorhost/partition_context.py index 510fdd6..b33099e 100644 --- a/azure/eventprocessorhost/partition_context.py +++ b/azure/eventprocessorhost/partition_context.py @@ -121,7 +121,7 @@ async def persist_checkpoint_async(self, checkpoint): self.lease.offset = checkpoint.offset self.lease.sequence_number = checkpoint.sequence_number else: - _logger.error( + _logger.error( # pylint: disable=logging-not-lazy "Ignoring out of date checkpoint with offset %r/sequence number %r because " + "current persisted checkpoint has higher offset %r/sequence number %r", checkpoint.offset, diff --git a/azure/eventprocessorhost/partition_pump.py b/azure/eventprocessorhost/partition_pump.py index be8be04..cc2dcdc 100644 --- a/azure/eventprocessorhost/partition_pump.py +++ b/azure/eventprocessorhost/partition_pump.py @@ -143,7 +143,7 @@ async def process_events_async(self, events): # CloseAsync are protected by synchronizing too. try: last = events[-1] - if last != None: + if last is not None: self.partition_context.set_offset_and_sequence_number(last) await self.processor.process_events_async(self.partition_context, events) except Exception as err: # pylint: disable=broad-except diff --git a/dev_requirements.txt b/dev_requirements.txt index 3cbeb9a..31a0ba7 100644 --- a/dev_requirements.txt +++ b/dev_requirements.txt @@ -2,5 +2,5 @@ pytest>=3.4.1 pytest-asyncio>=0.8.0 docutils>=0.14 pygments>=2.2.0 -pylint==1.8.4 +pylint==2.1.1 behave==1.2.6 \ No newline at end of file diff --git a/examples/recv.py b/examples/recv.py index d2fbdf7..f43d03b 100644 --- a/examples/recv.py +++ b/examples/recv.py @@ -38,11 +38,15 @@ receiver = client.add_receiver(CONSUMER_GROUP, PARTITION, prefetch=5000, offset=OFFSET) client.run() start_time = time.time() - for event_data in receiver.receive(timeout=100): - last_offset = event_data.offset - last_sn = event_data.sequence_number - print("Received: {}, {}".format(last_offset.value, last_sn)) - total += 1 + batch = receiver.receive(timeout=5000) + while batch: + for event_data in batch: + last_offset = event_data.offset + last_sn = event_data.sequence_number + print("Received: {}, {}".format(last_offset.value, last_sn)) + print(event_data.body_as_str()) + total += 1 + batch = receiver.receive(timeout=5000) end_time = time.time() client.stop() diff --git a/pylintrc b/pylintrc index 7b3f956..6e495c8 100644 --- a/pylintrc +++ b/pylintrc @@ -6,7 +6,7 @@ reports=no # For all codes, run 'pylint --list-msgs' or go to 'https://pylint.readthedocs.io/en/latest/reference_guide/features.html' # locally-disabled: Warning locally suppressed using disable-msg # cyclic-import: because of https://github.com/PyCQA/pylint/issues/850 -disable=raising-bad-type,missing-docstring,locally-disabled,fixme,cyclic-import,too-many-arguments,invalid-name,duplicate-code,logging-format-interpolation,too-many-instance-attributes,too-few-public-methods +disable=useless-object-inheritance,raising-bad-type,missing-docstring,locally-disabled,fixme,cyclic-import,too-many-arguments,invalid-name,duplicate-code,logging-format-interpolation,too-many-instance-attributes,too-few-public-methods [FORMAT] max-line-length=120 diff --git a/setup.py b/setup.py index 891a80e..d95ac21 100644 --- a/setup.py +++ b/setup.py @@ -44,12 +44,13 @@ author_email='azpysdkhelp@microsoft.com', url='https://github.com/Azure/azure-event-hubs-python', classifiers=[ - 'Development Status :: 3 - Alpha', + 'Development Status :: 5 - Production/Stable', 'Programming Language :: Python', 'Programming Language :: Python :: 3', 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', 'License :: OSI Approved :: MIT License', ], zip_safe=False, diff --git a/tests/__init__.py b/tests/__init__.py index 7ec7d3b..7b7c91a 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,7 +12,7 @@ def get_logger(filename, level=logging.INFO): azure_logger = logging.getLogger("azure") azure_logger.setLevel(level) uamqp_logger = logging.getLogger("uamqp") - uamqp_logger.setLevel(logging.DEBUG) + uamqp_logger.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s') console_handler = logging.StreamHandler(stream=sys.stdout) diff --git a/tests/test_eph.py b/tests/test_eph.py deleted file mode 100644 index c1d43e7..0000000 --- a/tests/test_eph.py +++ /dev/null @@ -1,17 +0,0 @@ -# -------------------------------------------------------------------------------------------- -# Copyright (c) Microsoft Corporation. All rights reserved. -# Licensed under the MIT License. See License.txt in the project root for license information. -# ----------------------------------------------------------------------------------- - -import asyncio -import pytest - - -def test_eph_start(eph): - """ - Test that the processing host starts correctly - """ - pytest.skip("Not working yet") - loop = asyncio.get_event_loop() - loop.run_until_complete(eph.open_async()) - loop.run_until_complete(eph.close_async()) diff --git a/tests/test_negative.py b/tests/test_negative.py index dbc8096..bdfcbfd 100644 --- a/tests/test_negative.py +++ b/tests/test_negative.py @@ -7,6 +7,7 @@ import os import asyncio import pytest +import time from azure import eventhub from azure.eventhub import ( @@ -303,4 +304,59 @@ async def test_max_receivers_async(connection_str, senders): assert len(failed) == 1 print(failed[0].message) finally: - await client.stop_async() \ No newline at end of file + await client.stop_async() + + +def test_message_body_types(connection_str, senders): + client = EventHubClient.from_connection_string(connection_str, debug=False) + receiver = client.add_receiver("$default", "0", offset=Offset('@latest')) + try: + client.run() + + received = receiver.receive(timeout=5) + assert len(received) == 0 + senders[0].send(EventData(b"Bytes Data")) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'Bytes Data'] + assert received[0].body_as_str() == "Bytes Data" + with pytest.raises(TypeError): + received[0].body_as_json() + + senders[0].send(EventData("Str Data")) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'Str Data'] + assert received[0].body_as_str() == "Str Data" + with pytest.raises(TypeError): + received[0].body_as_json() + + senders[0].send(EventData(b'{"test_value": "JSON bytes data", "key1": true, "key2": 42}')) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'{"test_value": "JSON bytes data", "key1": true, "key2": 42}'] + assert received[0].body_as_str() == '{"test_value": "JSON bytes data", "key1": true, "key2": 42}' + assert received[0].body_as_json() == {"test_value": "JSON bytes data", "key1": True, "key2": 42} + + senders[0].send(EventData('{"test_value": "JSON str data", "key1": true, "key2": 42}')) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert list(received[0].body) == [b'{"test_value": "JSON str data", "key1": true, "key2": 42}'] + assert received[0].body_as_str() == '{"test_value": "JSON str data", "key1": true, "key2": 42}' + assert received[0].body_as_json() == {"test_value": "JSON str data", "key1": True, "key2": 42} + + senders[0].send(EventData(42)) + time.sleep(1) + received = receiver.receive(timeout=5) + assert len(received) == 1 + assert received[0].body_as_str() == "42" + with pytest.raises(ValueError): + received[0].body + except: + raise + finally: + client.stop() \ No newline at end of file diff --git a/tests/test_receive.py b/tests/test_receive.py index 1b7480e..1bdbe86 100644 --- a/tests/test_receive.py +++ b/tests/test_receive.py @@ -24,6 +24,7 @@ def test_receive_end_of_stream(connection_str, senders): received = receiver.receive(timeout=5) assert len(received) == 1 + assert received[0].body_as_str() == "Receiving only a single event" assert list(received[-1].body)[0] == b"Receiving only a single event" except: raise @@ -48,6 +49,9 @@ def test_receive_with_offset_sync(connection_str, senders): assert len(received) == 1 offset = received[0].offset + assert list(received[0].body) == [b'Data'] + assert received[0].body_as_str() == "Data" + offset_receiver = client.add_receiver("$default", "0", offset=offset) client.run() received = offset_receiver.receive(timeout=5) @@ -75,6 +79,9 @@ def test_receive_with_inclusive_offset(connection_str, senders): assert len(received) == 1 offset = received[0].offset + assert list(received[0].body) == [b'Data'] + assert received[0].body_as_str() == "Data" + offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset.value, inclusive=True)) client.run() received = offset_receiver.receive(timeout=5) @@ -101,6 +108,9 @@ def test_receive_with_datetime(connection_str, senders): assert len(received) == 1 offset = received[0].enqueued_time + assert list(received[0].body) == [b'Data'] + assert received[0].body_as_str() == "Data" + offset_receiver = client.add_receiver("$default", "0", offset=Offset(offset)) client.run() received = offset_receiver.receive(timeout=5) diff --git a/tests/test_reconnect.py b/tests/test_reconnect.py index a6aa0ce..bd10bbd 100644 --- a/tests/test_reconnect.py +++ b/tests/test_reconnect.py @@ -85,6 +85,15 @@ def test_send_with_forced_conn_close_sync(connection_str, receivers): assert list(received[0].body)[0] == b"A single event" +def pump(receiver): + messages = [] + batch = receiver.receive(timeout=1) + messages.extend(batch) + while batch: + batch = receiver.receive(timeout=1) + messages.extend(batch) + return messages + @pytest.mark.asyncio async def test_send_with_forced_conn_close_async(connection_str, receivers): #pytest.skip("long running") @@ -106,7 +115,7 @@ async def test_send_with_forced_conn_close_async(connection_str, receivers): received = [] for r in receivers: - received.extend(r.receive(timeout=1)) + received.extend(pump(r)) assert len(received) == 5 assert list(received[0].body)[0] == b"A single event"