Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pyspark - azure eventhub emulator - com.microsoft.azure.eventhubs.CommunicationException: Connection refused #688

Open
smpurkis opened this issue Aug 29, 2024 · 0 comments

Comments

@smpurkis
Copy link

Description

I'm using pyspark to write to eventhubs using this project. There is a new local eventhub emulator I'm using for testing locally https://github.com/Azure/azure-event-hubs-emulator-installer. But I get com.microsoft.azure.eventhubs.CommunicationException: Connection refused error when I try to use it with pyspark. It works if I use python azure eventhub packages.

Actual behavior

I get the error

Traceback (most recent call last):
  File "/Users/user/Projects/azure-event-hubs-emulator-installer/send_via_spark.py", line 76, in <module>
    serialized_rows.select("body").write.format("eventhubs").options(**ehConf).save()
  File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
  File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
  File "/Users/user/Projects/azure-event-hubs-emulator-installer/.venv/lib/python3.10/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o86.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 6 in stage 4.0 failed 1 times, most recent failure: Lost task 6.0 in stage 4.0 (TID 26) (192.168.68.105 executor driver): com.microsoft.azure.eventhubs.CommunicationException: Connection refused

Expected behavior

I should be able to change the connection string and consumer group and it should work as if it is a remote/hosted/real eventhub.

Versions

Python: 3.10.13
Python virtualenv (pip freeze):

aiohappyeyeballs==2.4.0
aiohttp==3.10.5
aiosignal==1.3.1
async-timeout==4.0.3
attrs==24.2.0
azure-core==1.30.2
azure-eventhub==5.12.1
azure-eventhub-checkpointstoreblob-aio==1.1.4
certifi==2024.7.4
cffi==1.17.0
charset-normalizer==3.3.2
cryptography==43.0.0
frozenlist==1.4.1
idna==3.8
isodate==0.6.1
msrest==0.7.1
multidict==6.0.5
numpy==2.1.0
oauthlib==3.2.2
pandas==2.2.2
py4j==0.10.9.7
pycparser==2.22
pyspark==3.5.2
python-dateutil==2.9.0.post0
pytz==2024.1
requests==2.32.3
requests-oauthlib==2.0.0
six==1.16.0
typing_extensions==4.12.2
tzdata==2024.1
urllib3==2.2.2
wrapt==1.16.0
yarl==1.9.4

azure-event-hubs-spark: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22

Reproduce

Working using azure eventhub python package

I use 3 scripts to test.

  1. Get the local evenhub emulator running, following instructions via https://github.com/Azure/azure-event-hubs-emulator-installer repo.
  2. Run listen.py in one terminal.
import asyncio

from azure.eventhub.aio import EventHubConsumerClient

EVENT_HUB_CONNECTION_STR = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
EVENT_HUB_NAME = "eh1"
CONSUMER_GROUP = "cg1"


async def on_event(partition_context, event):
    # Print the event data.
    print(
        'Received the event: "{}" from the partition with ID: "{}"'.format(
            event.body_as_str(encoding="UTF-8"), partition_context.partition_id
        )
    )


async def main():

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(
        EVENT_HUB_CONNECTION_STR,
        consumer_group=CONSUMER_GROUP,
        eventhub_name=EVENT_HUB_NAME,
    )
    async with client:
        # Call the receive method. Read from the beginning of the
        # partition (starting_position: "-1")
        await client.receive(on_event=on_event, starting_position="-1")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())
  1. Run send.py in another terminal.
import asyncio

from azure.eventhub import EventData
from azure.eventhub.aio import EventHubProducerClient

EVENT_HUB_CONNECTION_STR = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
EVENT_HUB_NAME = "eh1"


async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(
        conn_str=EVENT_HUB_CONNECTION_STR, eventhub_name=EVENT_HUB_NAME
    )
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData("First event "))
        event_data_batch.add(EventData("Second event"))
        event_data_batch.add(EventData("Third event"))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)


asyncio.run(run())
  1. In the listen.py terminal I receive events
> python listen.py
Received the event: "First event " from the partition with ID: "0"
Received the event: "Second event" from the partition with ID: "0"
Received the event: "Third event" from the partition with ID: "0"

Not working using pyspark and azure-event-hubs-spark

  1. Get the local evenhub emulator running, following instructions via https://github.com/Azure/azure-event-hubs-emulator-installer repo.
  2. Run the same listen.py script in one terminal.
  3. Run send_via_spark.py in another terminal.
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import SparkSession
from pyspark.sql.functions import struct, to_json
from pyspark import SparkContext as sc


EVENT_HUB_NAME = "eh1"

EVENT_HUB_CONNECTION_STR = f"Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;EntityPath={EVENT_HUB_NAME}"


spark = (
    SparkSession.builder.appName("EventHubs")
    .config(
        "spark.jars.packages",
        "com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22,org.apache.hadoop:hadoop-azure:3.4.0",  # noqa
    )
    .config("spark.sql.shuffle.partitions", 5)
    .config("spark.hadoop.fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")
    .config("spark.hadoop.fs.azure.account.auth.type", "OAuth")
    .config(
        "spark.hadoop.fs.azure.account.oauth.provider.type",
        "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",  # noqa
    )
    .config("spark.hadoop.fs.azure.test.emulator", "true")
    .config(
        "fs.azure.storage.emulator.account.name",
        "devstoreaccount1.blob.windows.core.net",
    )
    .config(
        "spark.hadoop.fs.azure.storage.emulator.account.name",
        "devstoreaccount1.blob.windows.core.net",
    )
    .getOrCreate()
)


ehConf = {
    "eventhubs.connectionString": spark.sparkContext._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(
        EVENT_HUB_CONNECTION_STR
    )
}


# ehConf["eventhubs.consumerGroup"] = "$Default"
ehConf["eventhubs.consumerGroup"] = "cg1"

df = spark.createDataFrame(
    [
        ("1234", "1", "2021-01-01 00:00:00"),
        ("1234", "2", "2021-01-01 00:00:01"),
        ("1234", "3", "2021-01-01 00:00:02"),
    ],
    ["cardNumber", "transactionId", "transactionTime"],
)

Schema = StructType(
    [
        StructField("cardNumber", StringType(), True),
        StructField("transactionId", StringType(), True),
        StructField("transactionTime", StringType(), True),
    ]
)

serialized_rows = df.select(to_json(struct([df[x] for x in df.columns])).alias("body"))

# print(serialized_rows.select("body").toPandas())
serialized_rows.show(truncate=False)
print([r["body"] for r in serialized_rows.select("body").collect()])

print(ehConf)
serialized_rows.select("body").write.format("eventhubs").options(**ehConf).save()
  1. In the listen.py terminal I receive nothing and in the send_via_spark.py terminal I get a rather lengthy error message
    error.log
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant