You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I have observed the Pulsar spark connector occasionally drops data and duplicates data. It seems to be happening when the underlying Pulsar client has connectivity issues with the Pulsar broker.
To Reproduce
I have a unit test that writes 10 numbers, 11 through 20, to a Pulsar topic, and then attempts to read back all the data from that topic using Structured Streaming. Occasionally, I have observed that the test reads more than 10 messages, and, less frequently, less than 10 messages. For example, this very simple test has returned:
This doesn't always happen—it's flaky. To reproduce this, you might have to run this 100s of times in an environment that can have network slowdowns/failures. I ran this on an internal cloud environment.
Expected behavior
The connector should always read back exactly 10 messages.
Additional context
I added logs to the pulsar-spark library and re-ran my test hundreds of times. I then saw the following logs:
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Reading currentMessage from the reader
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Got currentMessage=11
24/11/05 02:21:11 INFO PulsarSourceUtils: Returning false for enteredEnd with end=1:12:-1 and current=1:10:-1:0
24/11/05 02:21:11 INFO ConnectionHandler: [persistent://public/default/topic1] [spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Reconnecting after timeout
24/11/05 02:21:11 INFO ConsumerImpl: [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Subscribing to topic on cnx [id: 0x456bd5bc, L:/127.0.0.1:55998 - R:localhost/127.0.0.1:32803], consumerId 1
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,196+0000 [pulsar-io-19-13] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:37088] Subscribing on topic persistent://public/default/topic1 / spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,196+0000 [pulsar-io-19-13] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Creating non-durable subscription at msg id 1:10:-1:-1 - {}
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,198+0000 [pulsar-io-19-13] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/topic1-spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Rewind from 1:10 to 1:10
24/11/05 02:21:11 INFO pulsar-spark-test-logger: 2024-11-05T02:21:11,199+0000 [pulsar-io-19-13] INFO org.apache.pulsar.broker.service.ServerCnx - [/172.17.0.1:37088] Created subscription on topic persistent://public/default/topic1 / spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4
24/11/05 02:21:11 INFO ConsumerImpl: [persistent://public/default/topic1][spark-pulsar-2b74618d-ff6a-421e-8468-afdf7be846ec-reader-0f5dc824b4] Subscribed to topic on localhost/127.0.0.1:32803 -- consumer: 1
24/11/05 02:21:11 INFO CodeGenerator: Code generated in 33.914034 ms
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Reading currentMessage from the reader
24/11/05 02:21:11 INFO PulsarSourceRDD: [DBG] Got currentMessage=11
The logs that start with [DBG] are my own, and I added them at the end of getNext in PulsarSourceRDDBase. You'll notice that the record 11 is read, we get that the pulsar client prints that it is Reconnecting after timeout, and then 11 is read again. I have also noticed the original read reading arbitrary values, like 13.
I dug a little bit into the Pulsar codebase and found that there are known issues around the Pulsar reader's seek method; see this issue for example. This other issue has the same symptom that I am seeing here. I believe that this PR fixes the underlying issue, so my recommendation would be for us to upgrade the Pulsar client version of this library to be something that has this fix, i.e. ^3.0.4.
The text was updated successfully, but these errors were encountered:
neilramaswamy
changed the title
[BUG] Data loss and data deduplication when the Pulsar reader experiences network connectivity issues
[BUG] Data loss and data duplication when the Pulsar reader experiences network connectivity issues
Nov 11, 2024
Describe the bug
I have observed the Pulsar spark connector occasionally drops data and duplicates data. It seems to be happening when the underlying Pulsar client has connectivity issues with the Pulsar broker.
To Reproduce
I have a unit test that writes 10 numbers, 11 through 20, to a Pulsar topic, and then attempts to read back all the data from that topic using Structured Streaming. Occasionally, I have observed that the test reads more than 10 messages, and, less frequently, less than 10 messages. For example, this very simple test has returned:
This doesn't always happen—it's flaky. To reproduce this, you might have to run this 100s of times in an environment that can have network slowdowns/failures. I ran this on an internal cloud environment.
Expected behavior
The connector should always read back exactly 10 messages.
Additional context
I added logs to the
pulsar-spark
library and re-ran my test hundreds of times. I then saw the following logs:The logs that start with
[DBG]
are my own, and I added them at the end ofgetNext
inPulsarSourceRDDBase
. You'll notice that the record 11 is read, we get that the pulsar client prints that it isReconnecting after timeout
, and then 11 is read again. I have also noticed the original read reading arbitrary values, like 13.I dug a little bit into the Pulsar codebase and found that there are known issues around the Pulsar reader's
seek
method; see this issue for example. This other issue has the same symptom that I am seeing here. I believe that this PR fixes the underlying issue, so my recommendation would be for us to upgrade the Pulsar client version of this library to be something that has this fix, i.e.^3.0.4
.The text was updated successfully, but these errors were encountered: