Skip to content

Commit

Permalink
unified the usage of property bags, it's inevitable ...
Browse files Browse the repository at this point in the history
  • Loading branch information
eldernewborn committed Nov 1, 2024
1 parent e09e56d commit 7c62218
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -67,14 +66,14 @@ public class VenicePubsubInputPartitionReader implements PartitionReader<Interna
// the buffer that holds the relevant messages for the current partition
private List<PubSubMessage<KafkaKey, KafkaMessageEnvelope, Long>> partitionMessagesBuffer = new ArrayList<>();

public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputPartition inputPartition) {
public VenicePubsubInputPartitionReader(VeniceProperties jobConfig, VenicePubsubInputPartition inputPartition) {
this(
jobConfig,
inputPartition,
new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the
new PubSubClientsFactory(jobConfig).getConsumerAdapterFactory() // need to review the
// properties bag ...
.create(
new VeniceProperties(jobConfig),
jobConfig,
false,
PubSubMessageDeserializer.getInstance(),
// PubSubPassThroughDeserializer.getInstance(),
Expand All @@ -84,7 +83,7 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP

// testing constructor
public VenicePubsubInputPartitionReader(
Properties jobConfig,
VeniceProperties jobConfig,
VenicePubsubInputPartition inputPartition,
PubSubConsumerAdapter consumer,
PubSubTopicRepository pubSubTopicRepository) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice.spark.input.pubsub.table;

import java.util.Properties;
import com.linkedin.venice.utils.VeniceProperties;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
Expand All @@ -10,9 +10,9 @@
public class VenicePubsubInputPartitionReaderFactory implements PartitionReaderFactory {
private static final long serialVersionUID = 1L;

private final Properties jobConfig;
private final VeniceProperties jobConfig;

public VenicePubsubInputPartitionReaderFactory(Properties jobConfig) {
public VenicePubsubInputPartitionReaderFactory(VeniceProperties jobConfig) {
this.jobConfig = jobConfig;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.linkedin.venice.spark.input.pubsub.table;

import static com.linkedin.venice.vpj.VenicePushJobConstants.*;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
Expand All @@ -15,7 +17,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.spark.sql.connector.read.Batch;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
Expand All @@ -24,27 +25,26 @@


public class VenicePubsubInputScan implements Scan, Batch {
private final Properties jobConfig;
private final VeniceProperties jobConfig;

public VenicePubsubInputScan(Properties jobConfig) {
public VenicePubsubInputScan(VeniceProperties jobConfig) {
this.jobConfig = jobConfig;
}

@Override
public InputPartition[] planInputPartitions() {
try {
String topicName = "test_topic_v_1";
String regionName = "ei-ltx1";
String topicName = jobConfig.getString(KAFKA_INPUT_TOPIC);
String regionName = jobConfig.getString(KAFKA_INPUT_FABRIC);
int splitCount = 1000;
VeniceProperties veniceProperties = new VeniceProperties(jobConfig);
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
PubSubTopic pubSubTopic = pubSubTopicRepository.getTopic(topicName);
PubSubClientsFactory clientsFactory = new PubSubClientsFactory(veniceProperties);
PubSubClientsFactory clientsFactory = new PubSubClientsFactory(jobConfig);
// PubSubAdminAdapter pubsubAdminClient =
// clientsFactory.getAdminAdapterFactory().create(veniceProperties, pubSubTopicRepository);
PubSubMessageDeserializer pubSubMessageDeserializer = PubSubMessageDeserializer.getInstance();
PubSubConsumerAdapter pubSubConsumer = clientsFactory.getConsumerAdapterFactory()
.create(veniceProperties, false, pubSubMessageDeserializer, "Spark_KIF_planner");
.create(jobConfig, false, pubSubMessageDeserializer, "Spark_KIF_planner");

// surround by retries from retryUtils or something
List<PubSubTopicPartitionInfo> listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
package com.linkedin.venice.spark.input.pubsub.table;

import java.util.Properties;
import com.linkedin.venice.utils.VeniceProperties;
import org.apache.spark.sql.connector.read.Scan;
import org.apache.spark.sql.connector.read.ScanBuilder;


public class VenicePubsubInputScanBuilder implements ScanBuilder {
private final Properties jobConfig;
private final VeniceProperties jobConfig;

public VenicePubsubInputScanBuilder(Properties properties) {
public VenicePubsubInputScanBuilder(VeniceProperties properties) {
this.jobConfig = properties;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
Properties properties = jobConfig.getPropertiesCopy();
properties.putAll(options.asCaseSensitiveMap());

return new VenicePubsubInputScanBuilder(properties); // should we flip this to VeniceProperties?
return new VenicePubsubInputScanBuilder(new VeniceProperties(properties)); // should we flip this to
// VeniceProperties?
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
}

@Override
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> sparkConfigs) {
public Table getTable(StructType schema, Transform[] partitioning, Map<String, String> configs) {
Properties properties = new Properties();
properties.putAll(sparkConfigs);
properties.putAll(configs);
// the properties here is the entry point for all the configurations
// we receive from the outer layer.
// schem and partitioning are useless and should be discarded?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.venice.pubsub.api.PubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.storage.protocol.ChunkedKeySuffix;
import com.linkedin.venice.utils.VeniceProperties;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -83,7 +84,11 @@ public void setUp() {
inputPartition =
new VenicePubsubInputPartition("prod-lva2", topicName, targetPartitionNumber, startingOffset, endingOffset);

reader = new VenicePubsubInputPartitionReader(jobConfig, inputPartition, consumer, pubSubTopicRepository);
reader = new VenicePubsubInputPartitionReader(
new VeniceProperties(jobConfig),
inputPartition,
consumer,
pubSubTopicRepository);
}

@Test
Expand Down

0 comments on commit 7c62218

Please sign in to comment.