diff --git a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java index 6266b9efc6..52c181d095 100644 --- a/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java +++ b/clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java @@ -469,16 +469,20 @@ private synchronized void bootstrap() { // Subscribe all bootstrap version partitions. storeNameToBootstrapVersionMap.forEach((storeName, version) -> { - List partitions = storeNameToPartitionListMap.get(storeName); + List partitions = storeNameToPartitionListMap.get(storeName); // [0 1] String versionTopic = version.kafkaTopicName(); LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic); AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic); aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine); StoreBackend storeBackend = getStoreOrThrow(storeName); - ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); - ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); - unassignedPartitionSet.removeAll(subscription); - storeBackend.unsubscribe(unassignedPartitionSet); + ComplementSet subscription = ComplementSet.newSet(storeBackend.getSubscription()); // [] // [1 2 3 4] + ComplementSet unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); // [ + // 0 + // 1 + // 1000000000 + // ] + unassignedPartitionSet.removeAll(subscription); // - [] + storeBackend.unsubscribe(unassignedPartitionSet); // [ 0 1 1000000000 ] storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version)); }); } diff --git a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java index 1f797dc8e4..972db84cdd 100644 --- a/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java +++ b/internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/DaVinciClientTest.java @@ -46,7 +46,9 @@ import com.linkedin.d2.balancer.D2Client; import com.linkedin.d2.balancer.D2ClientBuilder; +import com.linkedin.davinci.DaVinciBackend; import com.linkedin.davinci.DaVinciUserApp; +import com.linkedin.davinci.StoreBackend; import com.linkedin.davinci.client.AvroGenericDaVinciClient; import com.linkedin.davinci.client.DaVinciClient; import com.linkedin.davinci.client.DaVinciConfig; @@ -83,15 +85,7 @@ import com.linkedin.venice.serialization.avro.AvroProtocolDefinition; import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer; import com.linkedin.venice.store.rocksdb.RocksDBUtils; -import com.linkedin.venice.utils.DataProviderUtils; -import com.linkedin.venice.utils.ForkedJavaProcess; -import com.linkedin.venice.utils.IntegrationTestPushUtils; -import com.linkedin.venice.utils.Pair; -import com.linkedin.venice.utils.PropertyBuilder; -import com.linkedin.venice.utils.TestUtils; -import com.linkedin.venice.utils.TestWriteUtils; -import com.linkedin.venice.utils.Utils; -import com.linkedin.venice.utils.VeniceProperties; +import com.linkedin.venice.utils.*; import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils; import com.linkedin.venice.writer.VeniceWriter; import com.linkedin.venice.writer.VeniceWriterFactory; @@ -1286,8 +1280,6 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception { @Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class) public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception { String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); - String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); - String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT); String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath(); VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true) .put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1) @@ -1301,6 +1293,7 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti MetricsRepository metricsRepository = new MetricsRepository(); // Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path + StoreBackend storeBackend; try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME, @@ -1346,32 +1339,16 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti }); } - // Test multiple client ingesting different stores concurrently - DaVinciClient client2 = factory.getAndStartGenericAvroClient(storeName2, clientConfig); - DaVinciClient client3 = factory.getAndStartGenericAvroClient(storeName3, clientConfig); - CompletableFuture.allOf(client2.subscribeAll(), client3.subscribeAll()).get(); - assertEquals(client2.batchGet(keyValueMap.keySet()).get(), keyValueMap); - assertEquals(client3.batchGet(keyValueMap.keySet()).get(), keyValueMap); - - // TODO(jlliu): Re-enable this test-case after fixing store deletion that is flaky due to - // CLIENT_USE_SYSTEM_STORE_REPOSITORY. - // // Test read from a store that is being deleted concurrently - // try (ControllerClient controllerClient = cluster.getControllerClient()) { - // ControllerResponse response = controllerClient.disableAndDeleteStore(storeName2); - // assertFalse(response.isError(), response.getError()); - // TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> { - // assertThrows(VeniceClientException.class, () -> client2.get(KEY_COUNT / 3).get()); - // }); - // } - client2.unsubscribeAll(); + DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend(); + storeBackend = daVinciBackend.getStoreOrThrow(storeName1); + List partitions = new ArrayList(); + partitions.add(1); + partitions.add(2); + partitions.add(3); + partitions.add(4); + ComplementSet subscription = ComplementSet.newSet(partitions); + storeBackend.subscribe(subscription); } - - // Test bootstrap-time junk removal - cluster.useControllerClient(controllerClient -> { - ControllerResponse response = controllerClient.disableAndDeleteStore(storeName3); - assertFalse(response.isError(), response.getError()); - }); - // Test managed clients & data cleanup try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory( d2Client, @@ -1384,8 +1361,6 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti DaVinciClient client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig); client1.subscribeAll().get(); client1.unsubscribeAll(); - // client2 was removed explicitly above via disableAndDeleteStore() - // client3 is expected to be removed by the factory during bootstrap TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> { assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0); });