Skip to content

Commit

Permalink
better handling of empty returns from poll
Browse files Browse the repository at this point in the history
  • Loading branch information
eldernewborn committed Nov 1, 2024
1 parent 7c62218 commit 2aaa011
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class VenicePubsubInputPartitionReader implements PartitionReader<Interna
private Map<PubSubTopicPartition, List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>>> consumerBuffer =
new HashMap<>();
// the buffer that holds the relevant messages for the current partition
private List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> partitionMessagesBuffer = new ArrayList<>();

public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) {
this(
Expand Down Expand Up @@ -119,7 +118,8 @@ public VenicePubsubInputPartitionReader(
}

private void initialize() {
next(); // get the first record ready to go.
next();// get the first record ready to go.
recordsServed = 0; // reset the counter
}

// if it returns a row, it's going to be key and value and offset in the row in that order
Expand All @@ -132,7 +132,7 @@ public InternalRow get() {
@Override
public boolean next() {
// Are we past the finish line ?
if (currentOffset > endingOffset) {
if (currentOffset >= endingOffset) {
return false;
}

Expand All @@ -158,10 +158,13 @@ public void close() {

// borrowing Gaojie's code for dealing with empty polls.
private void loadRecords() {
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> partitionMessagesBuffer = new ArrayList<>();

int retry = 0;
while (retry++ < CONSUMER_POLL_EMPTY_RESULT_RETRY_TIMES) {
consumerBuffer = pubSubConsumer.poll(CONSUMER_POLL_TIMEOUT);
partitionMessagesBuffer = consumerBuffer.get(targetPubSubTopicPartition);

partitionMessagesBuffer.addAll(consumerBuffer.get(targetPubSubTopicPartition));
if (!partitionMessagesBuffer.isEmpty()) {
// we got some records back for the desired partition.
break;
Expand All @@ -185,6 +188,7 @@ private void loadRecords() {
throw new RuntimeException("Empty poll after " + retry + " retries");
}
messageBuffer.addAll(partitionMessagesBuffer);

}

private InternalRow processPubSubMessageToRow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.Map;
import java.util.Properties;
import org.apache.spark.sql.catalyst.InternalRow;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;


Expand All @@ -36,7 +36,7 @@ public class VenicePubsubInputPartitionReaderTest {
private VenicePubsubInputPartitionReader reader;
private VenicePubsubInputPartition inputPartition;

@BeforeTest
@BeforeMethod
public void setUp() {
Properties jobConfig = new Properties();
int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ...
Expand All @@ -51,14 +51,14 @@ public void setUp() {

PubSubConsumerAdapter consumer = mock(PubSubConsumerAdapter.class);

long numRecords = endingOffset - startingOffset;
long numRecords = endingOffset - startingOffset + 1;
List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> consumerRecordList = new ArrayList<>();

PubSubTopicPartition pubSubTopicPartition =
new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber);

// fill the topic message array
for (int i = startingOffset; i < numRecords; ++i) {
for (int i = startingOffset; i <= numRecords; ++i) {
byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes();
byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes();

Expand Down Expand Up @@ -93,12 +93,12 @@ public void setUp() {

@Test
public void testNext() {
assertTrue(reader.next());
for (int i = 0; i < 99; i++) {
System.out.println();
assertTrue(reader.next()); // first record 0
for (int i = 0; i < 98; i++) { // 99 more records
reader.get();
assertTrue(reader.next());
}
reader.get();
assertFalse(reader.next());
reader.close();
}
Expand Down

0 comments on commit 2aaa011

Please sign in to comment.