Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Fixed some tests + samples
Browse files Browse the repository at this point in the history
  • Loading branch information
annatisch committed Jul 26, 2018
1 parent 35e1a67 commit 0f5ddda
Show file tree
Hide file tree
Showing 9 changed files with 15 additions and 22 deletions.
10 changes: 2 additions & 8 deletions azure/eventhub/_async/sender_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,9 @@ async def open_async(self):
"""
if self.redirected:
self.target = self.redirected.address
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler = SendClientAsync(
self.target,
auth=self.client.get_auth(**alt_creds),
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
Expand All @@ -80,13 +77,10 @@ async def reconnect_async(self):
a retryable error - attempt to reconnect."""
pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent)
unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states]
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
await self._handler.close_async()
self._handler = SendClientAsync(
self.target,
auth=self.client.get_auth(**alt_creds),
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
Expand Down
10 changes: 2 additions & 8 deletions azure/eventhub/sender.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,9 @@ def open(self): #, connection):
"""
if self.redirected:
self.target = self.redirected.address
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler = SendClient(
self.target,
auth=self.client.get_auth(**alt_creds),
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
Expand All @@ -77,13 +74,10 @@ def reconnect(self):
a retryable error - attempt to reconnect."""
pending_states = (constants.MessageState.WaitingForSendAck, constants.MessageState.WaitingToBeSent)
unsent_events = [e for e in self._handler._pending_messages if e.state in pending_states]
alt_creds = {
"username": self.client._auth_config.get("iot_username"),
"password":self.client._auth_config.get("iot_password")}
self._handler.close()
self._handler = SendClient(
self.target,
auth=self.client.get_auth(**alt_creds),
auth=self.client.get_auth(),
debug=self.client.debug,
msg_timeout=Sender.TIMEOUT,
error_policy=self.retry_policy,
Expand Down
4 changes: 2 additions & 2 deletions azure/eventprocessorhost/partition_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def set_offset_and_sequence_number(self, event_data):
"""
if not event_data:
raise Exception(event_data)
self.offset = event_data.offset
self.offset = event_data.offset.value
self.sequence_number = event_data.sequence_number

async def get_initial_offset_async(self): # throws InterruptedException, ExecutionException
Expand Down Expand Up @@ -84,7 +84,7 @@ async def checkpoint_async_event_data(self, event_data):
raise ValueError("Argument Out Of Range event_data x-opt-sequence-number")

await self.persist_checkpoint_async(Checkpoint(self.partition_id,
event_data.offset,
event_data.offset.value,
event_data.sequence_number))

def to_string(self):
Expand Down
2 changes: 1 addition & 1 deletion examples/recv.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
for event_data in receiver.receive(timeout=100):
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Received: {}, {}".format(last_offset, last_sn))
print("Received: {}, {}".format(last_offset.value, last_sn))
total += 1

end_time = time.time()
Expand Down
2 changes: 1 addition & 1 deletion examples/recv_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def pump(client, partition):
for event_data in await receiver.receive(timeout=10):
last_offset = event_data.offset
last_sn = event_data.sequence_number
print("Received: {}, {}".format(last_offset, last_sn))
print("Received: {}, {}".format(last_offset.value, last_sn))
total += 1
end_time = time.time()
run_time = end_time - start_time
Expand Down
2 changes: 1 addition & 1 deletion examples/recv_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
client.run()
batched_events = receiver.receive(max_batch_size=10)
for event_data in batched_events:
last_offset = event_data.offset
last_offset = event_data.offset.value
last_sn = event_data.sequence_number
total += 1
print("Partition {}, Received {}, sn={} offset={}".format(
Expand Down
2 changes: 2 additions & 0 deletions tests/test_iothub_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ def test_iothub_receive_sync(iot_connection_str, device_id):
receiver = client.add_receiver("$default", "0", operation='/messages/events')
try:
client.run()
partitions = client.get_eventhub_info()
assert partitions["partition_ids"] == ["0", "1", "2", "3"]
received = receiver.receive(timeout=5)
assert len(received) == 0
finally:
Expand Down
1 change: 0 additions & 1 deletion tests/test_iothub_send.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ def test_iothub_send_single_event(iot_connection_str, device_id):
sender = client.add_sender(operation='/messages/devicebound')
try:
client.run()
partitions = client.get_eventhub_info()
outcome = sender.send(EventData(b"A single event", to_device=device_id))
assert outcome.value == 0
except:
Expand Down
4 changes: 4 additions & 0 deletions tests/test_receive.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ def test_receive_end_of_stream(connection_str, senders):

def test_receive_with_offset_sync(connection_str, senders):
client = EventHubClient.from_connection_string(connection_str, debug=False)
partitions = client.get_eventhub_info()
assert partitions["partition_ids"] == ["0", "1"]
receiver = client.add_receiver("$default", "0", offset=Offset('@latest'))
try:
client.run()
more_partitions = client.get_eventhub_info()
assert more_partitions["partition_ids"] == ["0", "1"]

received = receiver.receive(timeout=5)
assert len(received) == 0
Expand Down

0 comments on commit 0f5ddda

Please sign in to comment.