You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I'm using pyspark to write to eventhubs using this project. There is a new local eventhub emulator I'm using for testing locally https://github.com/Azure/azure-event-hubs-emulator-installer. But I get com.microsoft.azure.eventhubs.CommunicationException: Connection refused error when I try to use it with pyspark. It works if I use python azure eventhub packages.
Actual behavior
I get the error
Traceback (most recent call last):
File "/Users/user/Projects/azure-event-hubs-emulator-installer/send_via_spark.py", line 76, in <module>
serialized_rows.select("body").write.format("eventhubs").options(**ehConf).save()
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/pyspark/sql/readwriter.py", line 1461, in save
self._jwrite.save()
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
return_value = get_return_value(
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
return f(*a, **kw)
File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o86.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 4.0 failed 1 times, most recent failure: Lost task 6.0 in stage 4.0 (TID 26) (192.168.68.105 executor driver): com.microsoft.azure.eventhubs.CommunicationException: Connection refused
Expected behavior
I should be able to change the connection string and consumer group and it should work as if it is a remote/hosted/real eventhub.
importasynciofromazure.eventhub.aioimportEventHubConsumerClientEVENT_HUB_CONNECTION_STR="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"EVENT_HUB_NAME="eh1"CONSUMER_GROUP="cg1"asyncdefon_event(partition_context, event):
# Print the event data.print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
asyncdefmain():
# Create a consumer client for the event hub.client=EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group=CONSUMER_GROUP,
eventhub_name=EVENT_HUB_NAME,
)
asyncwithclient:
# Call the receive method. Read from the beginning of the# partition (starting_position: "-1")awaitclient.receive(on_event=on_event, starting_position="-1")
if__name__=="__main__":
loop=asyncio.get_event_loop()
# Run the main method.loop.run_until_complete(main())
Run send.py in another terminal.
importasynciofromazure.eventhubimportEventDatafromazure.eventhub.aioimportEventHubProducerClientEVENT_HUB_CONNECTION_STR="Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"EVENT_HUB_NAME="eh1"asyncdefrun():
# Create a producer client to send messages to the event hub.# Specify a connection string to your event hubs namespace and# the event hub name.producer=EventHubProducerClient.from_connection_string(
conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
)
asyncwithproducer:
# Create a batch.event_data_batch=awaitproducer.create_batch()
# Add events to the batch.event_data_batch.add(EventData("First event "))
event_data_batch.add(EventData("Second event"))
event_data_batch.add(EventData("Third event"))
# Send the batch of events to the event hub.awaitproducer.send_batch(event_data_batch)
asyncio.run(run())
In the listen.py terminal I receive events
> python listen.py
Received the event: "First event " from the partition with ID: "0"
Received the event: "Second event" from the partition with ID: "0"
Received the event: "Third event" from the partition with ID: "0"
Not working using pyspark and azure-event-hubs-spark
Description
I'm using pyspark to write to eventhubs using this project. There is a new local eventhub emulator I'm using for testing locally https://github.com/Azure/azure-event-hubs-emulator-installer. But I get
com.microsoft.azure.eventhubs.CommunicationException: Connection refused
error when I try to use it with pyspark. It works if I use python azure eventhub packages.Actual behavior
I get the error
Expected behavior
I should be able to change the connection string and consumer group and it should work as if it is a remote/hosted/real eventhub.
Versions
Python: 3.10.13
Python virtualenv (pip freeze):
azure-event-hubs-spark: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22
Reproduce
Working using azure eventhub python package
I use 3 scripts to test.
listen.py
in one terminal.send.py
in another terminal.listen.py
terminal I receive eventsNot working using pyspark and azure-event-hubs-spark
listen.py
script in one terminal.send_via_spark.py
in another terminal.listen.py
terminal I receive nothing and in thesend_via_spark.py
terminal I get a rather lengthy error messageerror.log
The text was updated successfully, but these errors were encountered: