Skip to content

Commit

Permalink
[dvc] Partition Difference Calculation and Update to Partition Subscr…
Browse files Browse the repository at this point in the history
…iption. Enabled config flag over bootstrap subscription
  • Loading branch information
kristyelee committed Dec 9, 2024
1 parent 699c200 commit 002d55c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.*;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS;
import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS;
import static com.linkedin.venice.client.store.ClientFactory.getTransportClient;
import static org.apache.avro.Schema.Type.RECORD;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,16 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.*;
import com.linkedin.venice.utils.ComplementSet;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -1351,11 +1360,10 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti

DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend();
storeBackend = daVinciBackend.getStoreOrThrow(storeName1);
List<Integer> partitions = new ArrayList<Integer>();
partitions.add(1);
partitions.add(2);
partitions.add(3);
partitions.add(4);
List<Integer> partitions = new ArrayList<>();
for (int i = 0; i < 5; ++i) {
partitions.add(i);
}
ComplementSet<Integer> subscription = ComplementSet.newSet(partitions);
storeBackend.subscribe(subscription);

Expand Down

0 comments on commit 002d55c

Please sign in to comment.