Skip to content

Commit

Permalink
formatting fixes
Browse files Browse the repository at this point in the history
datatype fixes
spotbug fixes
  • Loading branch information
eldernewborn committed Nov 1, 2024
1 parent a79f711 commit e09e56d
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
package com.linkedin.venice.spark.input.pubsub.clients;

import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition;
import java.util.Properties;

//import com.linkedin.venice.pubsub.PubSubTopicRepository;
//import com.linkedin.venice.pubsub.adapter.kafka.admin.ApacheKafkaAdminAdapter;
//import com.linkedin.venice.pubsub.adapter.kafka.consumer.ApacheKafkaConsumerAdapter;
//import com.linkedin.venice.spark.input.pubsub.table.VenicePubsubInputPartition;
//import java.util.Properties;

public class VenicePubsubPartitionConsumer {
private final Properties configBag;
private final VenicePubsubInputPartition inputPartition;
private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
private ApacheKafkaConsumerAdapter consumer;
private ApacheKafkaAdminAdapter admin;

public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) {
this.configBag = configBag;
this.inputPartition = inputPartition;
String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_"
+ inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset();

}
// potentially future home of consumer if we decide to factor it out of the VenicePubsubPartitionReader

// private final Properties configBag;
// private final VenicePubsubInputPartition inputPartition;
// private final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
// private ApacheKafkaConsumerAdapter consumer;
// private ApacheKafkaAdminAdapter admin;
//
// public VenicePubsubPartitionConsumer(Properties configBag, VenicePubsubInputPartition inputPartition) {
// this.configBag = configBag;
// this.inputPartition = inputPartition;
// String consumerName = "VeniceSpark_p-" + inputPartition.getPartitionNumber() + "_"
// + inputPartition.getSegmentStartOffset() + "-" + inputPartition.getSegmentEndOffset();
//
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public VenicePubsubInputPartitionReader(Properties jobConfig, VenicePubsubInputP
jobConfig,
inputPartition,
new PubSubClientsFactory(new VeniceProperties(jobConfig)).getConsumerAdapterFactory() // need to review the
// properties bag ...
// properties bag ...
.create(
new VeniceProperties(jobConfig),
false,
Expand Down Expand Up @@ -116,6 +116,10 @@ public VenicePubsubInputPartitionReader(
// pubSubConsumer.seek(startingOffset); // do we need this? or should we rely on the starting offset passed to
// subscribe ?

initialize(); // see, MET05-J asked for this !
}

private void initialize() {
next(); // get the first record ready to go.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@ public PartitionReader<InternalRow> createReader(InputPartition inputPartition)
public boolean supportColumnarReads(InputPartition partition) {
return false;
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.venice.spark.input.pubsub.table;

import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.pubsub.PubSubClientsFactory;
import com.linkedin.venice.pubsub.PubSubTopicPartitionInfo;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
Expand Down Expand Up @@ -48,7 +49,7 @@ public InputPartition[] planInputPartitions() {
// surround by retries from retryUtils or something
List<PubSubTopicPartitionInfo> listOfPartitions = pubSubConsumer.partitionsFor(pubSubTopic);

int numPartitions = listOfPartitions.size(); // number of partitions in the topic
// int numPartitions = listOfPartitions.size(); // number of partitions in the topic in case.

// need a map of int to long,long to store the start and end offsets for each partition
Map<Integer, List<Long>> partitionOffsetsMap = new HashMap<>();
Expand All @@ -66,15 +67,11 @@ public InputPartition[] planInputPartitions() {
}

Map<Integer, List<List<Long>>> splits = PartitionSplitters.segmentCountSplitter(partitionOffsetsMap, splitCount);
InputPartition[] inputPartitions =
PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]);
return (inputPartitions);
return PartitionSplitters.convertToInputPartitions(regionName, topicName, splits).toArray(new InputPartition[0]);
} catch (Exception e) {
// handle exception
throw new VeniceException("Could not get FileSystem", e);// handle exception
// something broke in the process of getting the splits
return null; // ? how do I tell spart that this is a failure?
}

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.spark.SparkConstants.*;

import com.linkedin.venice.utils.VeniceProperties;
import java.util.Collections;
import java.util.Properties;
import java.util.Set;
Expand All @@ -18,21 +19,19 @@

public class VenicePubsubInputTable implements SupportsRead {
static final String INPUT_TABLE_NAME = "venice_pubsub_table";
private final Properties properties;
private final VeniceProperties jobConfig;

public VenicePubsubInputTable(Properties properties) {
this.properties = properties;
// infer pubsub consumer properties from the properties
public VenicePubsubInputTable(VeniceProperties jobConfig) {
this.jobConfig = jobConfig;
//
}

@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
// convert the options to properties
Properties properties = new Properties();
Properties properties = jobConfig.getPropertiesCopy();
properties.putAll(options.asCaseSensitiveMap());

return new VenicePubsubInputScanBuilder(properties);
return new VenicePubsubInputScanBuilder(properties); // should we flip this to VeniceProperties?
}

@Override
Expand All @@ -42,7 +41,7 @@ public String name() {

@Override
public StructType schema() {
return DEFAULT_SCHEMA;
return KAFKA_INPUT_TABLE_SCHEMA;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import static com.linkedin.venice.spark.SparkConstants.*;

import com.linkedin.venice.utils.VeniceProperties;
import java.util.Map;
import java.util.Properties;
import org.apache.spark.sql.connector.catalog.Table;
Expand All @@ -13,11 +14,13 @@

public class VenicePubsubSource implements TableProvider {
public StructType inferSchema(CaseInsensitiveStringMap options) {
// there is no inference, the table is always created with the same schema
return KAFKA_INPUT_TABLE_SCHEMA;
}

@Override
public Transform[] inferPartitioning(CaseInsensitiveStringMap options) {
// we don't support partitioning, it comes from the kafka topic.
return TableProvider.super.inferPartitioning(options);
}

Expand All @@ -32,7 +35,8 @@ public Table getTable(StructType schema, Transform[] partitioning, Map<String, S

// VeniceProperties consumerProperties = KafkaInputUtils.getConsumerProperties(properties);

return new VenicePubsubInputTable(properties);
// if we want to break the bag-chain , this is the place !
return new VenicePubsubInputTable(new VeniceProperties(properties));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ public void testGet() {
// assertTrue(row.getInt(1) >= 0);
// assertTrue(row.getInt(1) < 1000);
// assertTrue(row.getBoolean(2));
System.out.println(key);
System.out.println(value);
}

@Test
Expand Down

0 comments on commit e09e56d

Please sign in to comment.