diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 38c4b879a8..27144d6f02 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -1,5 +1,6 @@ package com.linkedin.davinci; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX; import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION; import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL; @@ -207,6 +208,7 @@ public DaVinciBackend( String pid = Utils.getPid(); String instanceSuffix = configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid)); + // Current instance name. String instanceName = Utils.getHostName() + "_" + instanceSuffix; // Fetch latest update schema's protocol ID for Push Status Store from Router. @@ -466,16 +468,18 @@ private synchronized void bootstrap() { configLoader.getVeniceServerConfig()); ingestionBackend.addIngestionNotifier(ingestionListener); - // Subscribe all bootstrap version partitions. - storeNameToBootstrapVersionMap.forEach((storeName, version) -> { - List partitions = storeNameToPartitionListMap.get(storeName); - String versionTopic = version.kafkaTopicName(); - LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); - AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); - aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); - StoreBackend storeBackend = getStoreOrThrow(storeName); - storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); - }); + if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) { + // Subscribe all bootstrap version partitions. + storeNameToBootstrapVersionMap.forEach((storeName, version) -> { + List partitions = storeNameToPartitionListMap.get(storeName); + String versionTopic = version.kafkaTopicName(); + LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); + AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); + aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); + StoreBackend storeBackend = getStoreOrThrow(storeName); + storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); + }); + } } @Override diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java index d2b9ecbf92..f62c65fedc 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/StoreBackend.java @@ -108,6 +108,10 @@ public StoreBackendStats getStats() { return stats; } + public ComplementSet getSubscription() { + return subscription; + } + public ReferenceCounted getDaVinciCurrentVersion() { return daVinciCurrentVersionRef.get(); } @@ -154,6 +158,7 @@ synchronized CompletableFuture subscribe( // Recreate store config that was potentially deleted by unsubscribe. config.store(); } + subscription.addAll(partitions); if (daVinciFutureVersion == null) { diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java index 348b48128f..2a900a2cc4 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/client/AvroGenericDaVinciClient.java @@ -10,6 +10,7 @@ import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION; import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED; import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT; import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT; import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS; @@ -710,6 +711,7 @@ private VeniceConfigLoader buildVeniceConfig() { .put(CLUSTER_NAME, clusterName) .put(ZOOKEEPER_ADDRESS, zkAddress) .put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true) .put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK) .put(INGESTION_USE_DA_VINCI_CLIENT, true) .put( diff --git a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java index b6aecf1847..ffa67980c2 100644 --- a/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java +++ b/internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java @@ -97,6 +97,10 @@ private ConfigKeys() { public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND = "da.vinci.current.version.bootstrapping.quota.bytes.per.second"; + // On Da Vinci Client, control over automatic partition subscription. + public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY = + "da.vinci.subscribe.on.disk.partitions.automatically"; + // Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used. public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND = "kafka.fetch.quota.unordered.bytes.per.second"; diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 5714633ec9..e6b420f4f9 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -11,6 +11,7 @@ import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS; import static com.linkedin.venice.ConfigKeys.DAVINCI_PUSH_STATUS_SCAN_INTERVAL_IN_SECONDS; import static com.linkedin.venice.ConfigKeys.DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED; +import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY; import static com.linkedin.venice.ConfigKeys.KAFKA_FETCH_QUOTA_RECORDS_PER_SECOND; import static com.linkedin.venice.ConfigKeys.PERSISTENCE_TYPE; import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_STORE_ENABLED; @@ -46,7 +47,9 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; +import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -83,15 +86,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.DataProviderUtils; -import com.linkedin.venice.utils.ForkedJavaProcess; -import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.Pair; -import com.linkedin.venice.utils.PropertyBuilder; -import com.linkedin.venice.utils.TestUtils; -import com.linkedin.venice.utils.TestWriteUtils; -import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.*; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1283,6 +1278,88 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { } } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) + public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Exception { + String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); + String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); + VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) + .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) + .put(DATA_BASE_PATH, baseDataPath) + .put(PERSISTENCE_TYPE, ROCKS_DB) + .put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true) + .put(PUSH_STATUS_STORE_ENABLED, true) + .put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000) + .put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, false) + .build(); + + MetricsRepository metricsRepository = new MetricsRepository(); + + // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig)) { + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); + + // Test non-existent key access + client1.subscribeAll().get(); + assertNull(client1.get(KEY_COUNT + 1).get()); + + // Test single-get access + Map keyValueMap = new HashMap<>(); + for (int k = 0; k < KEY_COUNT; ++k) { + assertEquals(client1.get(k).get(), 1); + keyValueMap.put(k, 1); + } + + // Test batch-get access + assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap); + + // Test automatic new version ingestion + for (int i = 0; i < 2; ++i) { + // Test per-version partitioning parameters + int partitionCount = i + 1; + String iString = String.valueOf(i); + cluster.useControllerClient(controllerClient -> { + ControllerResponse response = controllerClient.updateStore( + storeName1, + new UpdateStoreQueryParams().setPartitionerClass(ConstantVenicePartitioner.class.getName()) + .setPartitionCount(partitionCount) + .setPartitionerParams( + Collections.singletonMap(ConstantVenicePartitioner.CONSTANT_PARTITION, iString))); + assertFalse(response.isError(), response.getError()); + }); + + Integer expectedValue = cluster.createVersion(storeName1, KEY_COUNT); + TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { + for (int k = 0; k < KEY_COUNT; ++k) { + Object readValue = client1.get(k).get(); + assertEquals(readValue, expectedValue); + } + }); + } + } + + // Test managed clients & data cleanup + try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( + d2Client, + VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, + metricsRepository, + backendConfig, + Optional.of(Collections.singleton(storeName1)))) { + + DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, daVinciConfig); + + DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); + if (daVinciBackend != null) { + StoreBackend storeBackend = daVinciBackend.getStoreOrThrow(storeName1); + ComplementSet subscription = storeBackend.getSubscription(); + assertTrue(subscription.isEmpty()); + } + } + } + @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception { String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);