Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Data loss and data duplication when the Pulsar reader experiences network connectivity issues #181

Open
neilramaswamy opened this issue Nov 7, 2024 · 1 comment
Labels

Comments

@neilramaswamy
Copy link

neilramaswamy commented Nov 7, 2024

Describe the bug

I have observed the Pulsar spark connector occasionally drops data and duplicates data. It seems to be happening when the underlying Pulsar client has connectivity issues with the Pulsar broker.

To Reproduce

I have a unit test that writes 10 numbers, 11 through 20, to a Pulsar topic, and then attempts to read back all the data from that topic using Structured Streaming. Occasionally, I have observed that the test reads more than 10 messages, and, less frequently, less than 10 messages. For example, this very simple test has returned:

�[31m  == Results ==�[0m
�[31m  !== Correct Answer - 10 ==   == Spark Answer - 11 ==�[0m
�[31m   struct<value:string>        struct<value:string>�[0m
�[31m   [11]                        [11]�[0m
�[31m  ![12]                        [11]�[0m
�[31m  ![13]                        [12]�[0m
�[31m  ![14]                        [13]�[0m
�[31m  ![15]                        [14]�[0m
�[31m  ![16]                        [15]�[0m
�[31m  ![17]                        [16]�[0m
�[31m  ![18]                        [17]�[0m
�[31m  ![19]                        [18]�[0m
�[31m  ![20]                        [19]�[0m
�[31m  !                            [20]�[0m

This doesn't always happen—it's flaky. To reproduce this, you might have to run this 100s of times in an environment that can have network slowdowns/failures. I ran this on an internal cloud environment.

Expected behavior

The connector should always read back exactly 10 messages.

Additional context

I added logs to the pulsar-spark library and re-ran my test hundreds of times. I then saw the following logs:

24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Reading currentMessage from the reader
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Got currentMessage=11
24/11/05 02:21:11 INFO PulsarSourceUtils: Returning false for enteredEnd with end=1:12:-1 and current=1:10:-1:0
24/11/05 02:21:11 INFO ConnectionHandler: [persistent://public/default/topic1] [spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Reconnecting after timeout
24/11/05 02:21:11 INFO ConsumerImpl: [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Subscribing to topic on cnx [id: 0x456bd5bc, L:/127.0.0.1:55998 - R:localhost/127.0.0.1:32803], consumerId 1
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,196+0000 [pulsar-io-19-13] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:37088] Subscribing on topic persistent://public/default/topic1 / spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,196+0000 [pulsar-io-19-13] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Creating non-durable subscription at msg id 1:10:-1:-1 - {}
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,198+0000 [pulsar-io-19-13] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic1-spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Rewind from 1:10 to 1:10
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,199+0000 [pulsar-io-19-13] INFO  org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:37088] Created subscription on topic persistent://public/default/topic1 / spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4
24/11/05 02:21:11 INFO ConsumerImpl: [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Subscribed to topic on localhost/127.0.0.1:32803 -- consumer: 1
24/11/05 02:21:11 INFO CodeGenerator: Code generated in 33.914034 ms
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Reading currentMessage from the reader
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Got currentMessage=11

The logs that start with [DBG] are my own, and I added them at the end of getNext in PulsarSourceRDDBase. You'll notice that the record 11 is read, we get that the pulsar client prints that it is Reconnecting after timeout, and then 11 is read again. I have also noticed the original read reading arbitrary values, like 13.

I dug a little bit into the Pulsar codebase and found that there are known issues around the Pulsar reader's seek method; see this issue for example. This other issue has the same symptom that I am seeing here. I believe that this PR fixes the underlying issue, so my recommendation would be for us to upgrade the Pulsar client version of this library to be something that has this fix, i.e. ^3.0.4.

@ericm-db
Copy link
Contributor

ericm-db commented Nov 7, 2024

Addressing this here: #180

@neilramaswamy neilramaswamy changed the title [BUG] Data loss and data deduplication when the Pulsar reader experiences network connectivity issues [BUG] Data loss and data duplication when the Pulsar reader experiences network connectivity issues Nov 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants