diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index b091d39a87d..96274dd1fca 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -40,7 +40,7 @@ public class VenicePubsubInputPartitionReaderTest { public void setUp() { Properties jobConfig = new Properties(); int startingOffset = 0; // starting offset other than 0 needs mocking of subscription ... - int endingOffset = 100; + int endingOffset = 77; // total of 78 records int targetPartitionNumber = 42; String topicName = "BigStrangePubSubTopic_V1_rt_r"; @@ -58,7 +58,7 @@ public void setUp() { new PubSubTopicPartitionImpl(pubSubTopicRepository.getTopic(topicName), targetPartitionNumber); // fill the topic message array - for (int i = startingOffset; i <= numRecords; ++i) { + for (int i = startingOffset; i < numRecords; ++i) { byte[] keyBytes = (KAFKA_MESSAGE_KEY_PREFIX + i).getBytes(); byte[] valueBytes = (KAFKA_MESSAGE_VALUE_PREFIX + i).getBytes(); @@ -95,7 +95,7 @@ public void setUp() { public void testNext() { System.out.println(); assertTrue(reader.next()); // first record 0 - for (int i = 0; i < 98; i++) { // 99 more records + for (int i = 1; i < 77; i++) { // 78 records expected reader.get(); assertTrue(reader.next()); } @@ -105,15 +105,16 @@ public void testNext() { @Test public void testGet() { - InternalRow row = reader.get(); - System.out.println(row); - long offset = row.getLong(0); - byte[] key = row.getBinary(1); - byte[] value = row.getBinary(2); - - assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); - assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); - + for (int i = 0; i < 77; i++) { // 78 records expected + InternalRow row = reader.get(); + System.out.println(row); + long offset = row.getLong(0); + byte[] key = row.getBinary(1); + byte[] value = row.getBinary(2); + + assertEquals(key, (KAFKA_MESSAGE_KEY_PREFIX + offset).getBytes()); + assertEquals(value, (KAFKA_MESSAGE_VALUE_PREFIX + offset).getBytes()); + } // assertEquals(row.get(0, KafkaKey), "dummyData1"); // assertTrue(row.getInt(1) >= 0); // assertTrue(row.getInt(1) < 1000);