diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index dd42cb79bf..90e5fb1f53 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,8 +1,6 @@ package com.linkedin.davinci; -import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; -import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; -import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_MEMORY_LIMIT_REACHED; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_OTHER; @@ -468,7 +466,7 @@ private synchronized void bootstrap() { configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); - if (!configLoader.getCombinedProperties().getBoolean(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false)) { + if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) { // Subscribe all bootstrap version partitions. storeNameToBootstrapVersionMap.forEach((storeName, version) -> { List partitions = storeNameToPartitionListMap.get(storeName); @@ -477,10 +475,6 @@ private synchronized void bootstrap() { AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); - ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); - ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); - unassignedPartitionSet.removeAll(subscription); - storeBackend.unsubscribe(unassignedPartitionSet); storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); } diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 5b5389aba0..8c500dc1cf 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -9,13 +9,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; -import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; -import static com.linkedin.venice.ConfigKeys.DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED; -import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; -import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; -import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; -import static com.linkedin.venice.ConfigKeys.KAFKA_BOOTSTRAP_SERVERS; -import static com.linkedin.venice.ConfigKeys.ZOOKEEPER_ADDRESS; +import static com.linkedin.venice.ConfigKeys.*; import static com.linkedin.venice.client.store.ClientFactory.getTransportClient; import static org.apache.avro.Schema.Type.RECORD; @@ -711,7 +705,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put(CLUSTER_NAME, clusterName) .put(ZOOKEEPER_ADDRESS, zkAddress) .put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers) - .put(DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED, false) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true) .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK) .put(INGESTION_USE_DA_VINCI_CLIENT, true) .put( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index f83835b9a6..61acef067d 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -98,7 +98,8 @@ private ConfigKeys() { "da.vinci.current.version.bootstrapping.quota.bytes.per.second"; // Prevent automatic subscription of partitions while bootstrapping. - public static final String DA_VINCI_BOOTSTRAP_SUBSCRIPTION_DISABLED = "da.vinci.bootstrap.subscription.disabled"; + public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY = + "da.vinci.subscribe.on.disk.partitions.automatically"; // Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used. public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND =