diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java index cbb7df4468b..1ffeb2a4994 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReader.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.NoSuchElementException; -import java.util.Properties; import java.util.concurrent.TimeUnit; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -67,14 +66,14 @@ public class VenicePubsubInputPartitionReader implements PartitionReader> partitionMessagesBuffer = new ArrayList<>(); - public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputPartition inputPartition) { + public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) { this( jobConfig, inputPartition, - new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the + new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() // need to review the // properties bag ... .create( - new VeniceProperties(jobConfig), + jobConfig, false, PubSubMessageDeserializer.getInstance(), // PubSubPassThroughDeserializer.getInstance(), @@ -84,7 +83,7 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP // testing constructor public VenicePubsubInputPartitionReader( - Properties jobConfig, + VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition, PubSubConsumerAdapter consumer, PubSubTopicRepository pubSubTopicRepository) { diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java index b8ad293e74f..41ce7e27986 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderFactory.java @@ -1,6 +1,6 @@ package com.linkedin.venice.spark.input.pubsub.table; -import java.util.Properties; +import com.linkedin.venice.utils.VeniceProperties; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -10,9 +10,9 @@ public class VenicePubsubInputPartitionReaderFactory implements PartitionReaderFactory { private static final long serialVersionUID = 1L; - private final Properties jobConfig; + private final VeniceProperties jobConfig; - public VenicePubsubInputPartitionReaderFactory(Properties jobConfig) { + public VenicePubsubInputPartitionReaderFactory(VeniceProperties jobConfig) { this.jobConfig = jobConfig; } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java index 8b8d1f8c209..f70490bfba3 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScan.java @@ -1,5 +1,7 @@ package com.linkedin.venice.spark.input.pubsub.table; +import static com.linkedin.venice.vpj.VenicePushJobConstants.*; + import com.linkedin.venice.exceptions.VeniceException; import com.linkedin.venice.pubsub.PubSubClientsFactory; import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo; @@ -15,7 +17,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Properties; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReaderFactory; @@ -24,27 +25,26 @@ public class VenicePubsubInputScan implements Scan, Batch { - private final Properties jobConfig; + private final VeniceProperties jobConfig; - public VenicePubsubInputScan(Properties jobConfig) { + public VenicePubsubInputScan(VeniceProperties jobConfig) { this.jobConfig = jobConfig; } @Override public InputPartition[] planInputPartitions() { try { - String topicName = "test_topic_v_1"; - String regionName = "ei-ltx1"; + String topicName = jobConfig.getString(KAFKA_INPUT_TOPIC); + String regionName = jobConfig.getString(KAFKA_INPUT_FABRIC); int splitCount = 1000; - VeniceProperties veniceProperties = new VeniceProperties(jobConfig); PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository(); PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName); - PubSubClientsFactory clientsFactory = new PubSubClientsFactory(veniceProperties); + PubSubClientsFactory clientsFactory = new PubSubClientsFactory(jobConfig); // PubSubAdminAdapter pubsubAdminClient = // clientsFactory.getAdminAdapterFactory().create(veniceProperties, pubSubTopicRepository); PubSubMessageDeserializer pubSubMessageDeserializer = PubSubMessageDeserializer.getInstance(); PubSubConsumerAdapter pubSubConsumer = clientsFactory.getConsumerAdapterFactory() - .create(veniceProperties, false, pubSubMessageDeserializer, "Spark_KIF_planner"); + .create(jobConfig, false, pubSubMessageDeserializer, "Spark_KIF_planner"); // surround by retries from retryUtils or something List listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic); diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java index 54fb008b6e2..b2a0dc77c0f 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputScanBuilder.java @@ -1,14 +1,14 @@ package com.linkedin.venice.spark.input.pubsub.table; -import java.util.Properties; +import com.linkedin.venice.utils.VeniceProperties; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.ScanBuilder; public class VenicePubsubInputScanBuilder implements ScanBuilder { - private final Properties jobConfig; + private final VeniceProperties jobConfig; - public VenicePubsubInputScanBuilder(Properties properties) { + public VenicePubsubInputScanBuilder(VeniceProperties properties) { this.jobConfig = properties; } diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java index d3f1bba9e7d..568778c098b 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputTable.java @@ -31,7 +31,8 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) { Properties properties = jobConfig.getPropertiesCopy(); properties.putAll(options.asCaseSensitiveMap()); - return new VenicePubsubInputScanBuilder(properties); // should we flip this to VeniceProperties? + return new VenicePubsubInputScanBuilder(new VeniceProperties(properties)); // should we flip this to + // VeniceProperties? } @Override diff --git a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java index 784068b1504..e02aa0577d8 100644 --- a/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java +++ b/clients/venice-push-job/src/main/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubSource.java @@ -25,9 +25,9 @@ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) { } @Override - public Table getTable(StructType schema, Transform[] partitioning, Map sparkConfigs) { + public Table getTable(StructType schema, Transform[] partitioning, Map configs) { Properties properties = new Properties(); - properties.putAll(sparkConfigs); + properties.putAll(configs); // the properties here is the entry point for all the configurations // we receive from the outer layer. // schem and partitioning are useless and should be discarded? diff --git a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java index 1f8faa5dd2b..aa04fbda3e8 100644 --- a/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java +++ b/clients/venice-push-job/src/test/java/com/linkedin/venice/spark/input/pubsub/table/VenicePubsubInputPartitionReaderTest.java @@ -17,6 +17,7 @@ import com.linkedin.venice.pubsub.api.PubSubMessage; import com.linkedin.venice.pubsub.api.PubSubTopicPartition; import com.linkedin.venice.storage.protocol.ChunkedKeySuffix; +import com.linkedin.venice.utils.VeniceProperties; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; @@ -83,7 +84,11 @@ public void setUp() { inputPartition = new VenicePubsubInputPartition("prod-lva2", topicName, targetPartitionNumber, startingOffset, endingOffset); - reader = new VenicePubsubInputPartitionReader(jobConfig, inputPartition, consumer, pubSubTopicRepository); + reader = new VenicePubsubInputPartitionReader( + new VeniceProperties(jobConfig), + inputPartition, + consumer, + pubSubTopicRepository); } @Test