From e09e56d58c4316410a6ad992dd3bc06935e6fe17 Mon Sep 17 00:00:00 2001 From: Ali Poursamadi Date: Wed, 9 Oct 2024 15:01:29 -0700 Subject: [PATCH] formatting fixes datatype fixes spotbug fixes --- .../VenicePubsubPartitionConsumer.java | 38 +++++++++---------- .../VenicePubsubInputPartitionReader.java | 6 ++- ...nicePubsubInputPartitionReaderFactory.java | 1 - .../pubsub/table/VenicePubsubInputScan.java | 11 ++---- .../pubsub/table/VenicePubsubInputTable.java | 15 ++++---- .../pubsub/table/VenicePubsubSource.java | 6 ++- .../VenicePubsubInputPartitionReaderTest.java | 2 - 7 files changed, 40 insertions(+), 39 deletions(-) diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java index ef005a55bc4..1e63a4200d2 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/clients/VenicePubsubPartitionConsumer.java @@ -1,25 +1,25 @@ package com.linkedin.venice.spark.input.pubsub.clients; -import com.linkedin.venice.pubsub.PubSubTopicRepository; -import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; -import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; -import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; -import java.util.Properties; - +//import com.linkedin.venice.pubsub.PubSubTopicRepository; +//import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter; +//import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter; +//import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition; +//import java.util.Properties; public class VenicePubsubPartitionConsumer { - private final Properties configBag; - private final VenicePubsubInputPartition inputPartition; - private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); - private ApacheKafkaConsumerAdapter consumer; - private ApacheKafkaAdminAdapter admin; - - public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) { - this.configBag = configBag; - this.inputPartition = inputPartition; - String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_" - + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset(); - - } + // potentially future home of consumer if we decide to factor it out of the VenicePubsubPartitionReader + // private final Properties configBag; + // private final VenicePubsubInputPartition inputPartition; + // private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); + // private ApacheKafkaConsumerAdapter consumer; + // private ApacheKafkaAdminAdapter admin; + // + // public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) { + // this.configBag = configBag; + // this.inputPartition = inputPartition; + // String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_" + // + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset(); + // + // } } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index 63123ab7ccd..cbb7df4468b 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -72,7 +72,7 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP jobConfig, inputPartition, new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the - // properties bag ... + // properties bag ... .create( new VeniceProperties(jobConfig), false, @@ -116,6 +116,10 @@ public VenicePubsubInputPartitionReader( // pubSubConsumer.seek(startingOffset); // do we need this? or should we rely on the starting offset passed to // subscribe ? + initialize(); // see, MET05-J asked for this ! + } + + private void initialize() { next(); // get the first record ready to go. } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java index 1fb1986b753..b8ad293e74f 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java @@ -32,5 +32,4 @@ public PartitionReader createReader(InputPartition inputPartition) public boolean supportColumnarReads(InputPartition partition) { return false; } - } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java index fa331ec71b1..8b8d1f8c209 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java @@ -1,5 +1,6 @@ package com.linkedin.venice.spark.input.pubsub.table; +import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; import com.linkedin.venice.pubsub.PubSubTopicRepository; @@ -48,7 +49,7 @@ public InputPartition[] planInputPartitions() { // surround by retries from retryUtils or something List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); - int numPartitions = listOfPartitions.size(); // number of partitions in the topic + // int numPartitions = listOfPartitions.size(); // number of partitions in the topic in case. // need a map of int to long,long to store the start and end offsets for each partition Map> partitionOffsetsMap = new HashMap<>(); @@ -66,15 +67,11 @@ public InputPartition[] planInputPartitions() { } Map>> splits = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, splitCount); - InputPartition[] inputPartitions = - PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]); - return (inputPartitions); + return PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]); } catch (Exception e) { - // handle exception + throw new VeniceException("Could not get FileSystem", e);// handle exception // something broke in the process of getting the splits - return null; // ? how do I tell spart that this is a failure? } - } @Override diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java index 55298de5502..d3f1bba9e7d 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.spark.SparkConstants.*; +import com.linkedin.venice.utils.VeniceProperties; import java.util.Collections; import java.util.Properties; import java.util.Set; @@ -18,21 +19,19 @@ public class VenicePubsubInputTable implements SupportsRead { static final String INPUT_TABLE_NAME = "venice_pubsub_table"; - private final Properties properties; + private final VeniceProperties jobConfig; - public VenicePubsubInputTable(Properties properties) { - this.properties = properties; - // infer pubsub consumer properties from the properties + public VenicePubsubInputTable(VeniceProperties jobConfig) { + this.jobConfig = jobConfig; // } @Override public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { - // convert the options to properties - Properties properties = new Properties(); + Properties properties = jobConfig.getPropertiesCopy(); properties.putAll(options.asCaseSensitiveMap()); - return new VenicePubsubInputScanBuilder(properties); + return new VenicePubsubInputScanBuilder(properties); // should we flip this to VeniceProperties? } @Override @@ -42,7 +41,7 @@ public String name() { @Override public StructType schema() { - return DEFAULT_SCHEMA; + return KAFKA_INPUT_TABLE_SCHEMA; } @Override diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java index 844b02ee0be..784068b1504 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java @@ -2,6 +2,7 @@ import static com.linkedin.venice.spark.SparkConstants.*; +import com.linkedin.venice.utils.VeniceProperties; import java.util.Map; import java.util.Properties; import org.apache.spark.sql.connector.catalog.Table; @@ -13,11 +14,13 @@ public class VenicePubsubSource implements TableProvider { public StructType inferSchema(CaseInsensitiveStringMap options) { + // there is no inference, the table is always created with the same schema return KAFKA_INPUT_TABLE_SCHEMA; } @Override public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { + // we don't support partitioning, it comes from the kafka topic. return TableProvider.super.inferPartitioning(options); } @@ -32,7 +35,8 @@ public Table getTable(StructType schema, Transform[] partitioning, Map= 0); // assertTrue(row.getInt(1) < 1000); // assertTrue(row.getBoolean(2)); - System.out.println(key); - System.out.println(value); } @Test