From 2aaa01161a2c649c1a9139364911b35a4c681a9e Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Fri, 1 Nov 2024 15:05:43 -0700 Subject: [PATCH] better handling of empty returns from poll --- .../table/VenicePubsubInputPartitionReader.java | 12 ++++++++---- .../VenicePubsubInputPartitionReaderTest.java | 14 +++++++------- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 1ffeb2a4994..6d10e0636ce 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -64,7 +64,6 @@ public class VenicePubsubInputPartitionReader implements PartitionReader>> consumerBuffer = new HashMap<>(); // the buffer that holds the relevant messages for the current partition - private List> partitionMessagesBuffer = new ArrayList<>(); public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) { this( @@ -119,7 +118,8 @@ public VenicePubsubInputPartitionReader( } private void initialize() { - next(); // get the first record ready to go. + next();// get the first record ready to go. + recordsServed = 0; // reset the counter } // if it returns a row, it's going to be key and value and offset in the row in that order @@ -132,7 +132,7 @@ public InternalRow get() { @Override public boolean next() { // Are we past the finish line ? - if (currentOffset > endingOffset) { + if (currentOffset >= endingOffset) { return false; } @@ -158,10 +158,13 @@ public void close() { // borrowing Gaojie's code for dealing with empty polls. private void loadRecords() { + List> partitionMessagesBuffer = new ArrayList<>(); + int retry = 0; while (retry++ < CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES) { consumerBuffer = pubSubConsumer.poll(CONSUMER_POLL_TIMEOUT); - partitionMessagesBuffer = consumerBuffer.get(targetPubSubTopicPartition); + + partitionMessagesBuffer.addAll(consumerBuffer.get(targetPubSubTopicPartition)); if (!partitionMessagesBuffer.isEmpty()) { // we got some records back for the desired partition. break; @@ -185,6 +188,7 @@ private void loadRecords() { throw new RuntimeException("Empty poll after " + retry + " retries"); } messageBuffer.addAll(partitionMessagesBuffer); + } private InternalRow processPubSubMessageToRow( diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index aa04fbda3e8..b091d39a87d 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -25,7 +25,7 @@ import java.util.Map; import java.util.Properties; import org.apache.spark.sql.catalyst.InternalRow; -import org.testng.annotations.BeforeTest; +import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -36,7 +36,7 @@ public class VenicePubsubInputPartitionReaderTest { private VenicePubsubInputPartitionReader reader; private VenicePubsubInputPartition inputPartition; - @BeforeTest + @BeforeMethod public void setUp() { Properties jobConfig = new Properties(); int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ... @@ -51,14 +51,14 @@ public void setUp() { PubSubConsumerAdapter consumer = mock(PubSubConsumerAdapter.class); - long numRecords = endingOffset - startingOffset; + long numRecords = endingOffset - startingOffset + 1; List> consumerRecordList = new ArrayList<>(); PubSubTopicPartition pubSubTopicPartition = new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber); // fill the topic message array - for (int i = startingOffset; i < numRecords; ++i) { + for (int i = startingOffset; i <= numRecords; ++i) { byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes(); byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes(); @@ -93,12 +93,12 @@ public void setUp() { @Test public void testNext() { - assertTrue(reader.next()); - for (int i = 0; i < 99; i++) { + System.out.println(); + assertTrue(reader.next()); // first record 0 + for (int i = 0; i < 98; i++) { // 99 more records reader.get(); assertTrue(reader.next()); } - reader.get(); assertFalse(reader.next()); reader.close(); }