Skip to content

Commit

Permalink
Integration Test [In Writing]
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Dec 9, 2024
1 parent b520ab8 commit b94b49a
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -469,16 +469,20 @@ private synchronized void bootstrap() {

// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
List<Integer> partitions = storeNameToPartitionListMap.get(storeName); // [0 1]
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
ComplementSet<Integer> subscription = ComplementSet.newSet(storeBackend.getSubscription());
ComplementSet<Integer> unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds());
unassignedPartitionSet.removeAll(subscription);
storeBackend.unsubscribe(unassignedPartitionSet);
ComplementSet<Integer> subscription = ComplementSet.newSet(storeBackend.getSubscription()); // [] // [1 2 3 4]
ComplementSet<Integer> unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds()); // [
// 0
// 1
// 1000000000
// ]
unassignedPartitionSet.removeAll(subscription); // - []
storeBackend.unsubscribe(unassignedPartitionSet); // [ 0 1 1000000000 ]
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@

import com.linkedin.d2.balancer.D2Client;
import com.linkedin.d2.balancer.D2ClientBuilder;
import com.linkedin.davinci.DaVinciBackend;
import com.linkedin.davinci.DaVinciUserApp;
import com.linkedin.davinci.StoreBackend;
import com.linkedin.davinci.client.AvroGenericDaVinciClient;
import com.linkedin.davinci.client.DaVinciClient;
import com.linkedin.davinci.client.DaVinciConfig;
Expand Down Expand Up @@ -83,15 +85,7 @@
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
import com.linkedin.venice.store.rocksdb.RocksDBUtils;
import com.linkedin.venice.utils.DataProviderUtils;
import com.linkedin.venice.utils.ForkedJavaProcess;
import com.linkedin.venice.utils.IntegrationTestPushUtils;
import com.linkedin.venice.utils.Pair;
import com.linkedin.venice.utils.PropertyBuilder;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.TestWriteUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.VeniceProperties;
import com.linkedin.venice.utils.*;
import com.linkedin.venice.utils.metrics.MetricsRepositoryUtils;
import com.linkedin.venice.writer.VeniceWriter;
import com.linkedin.venice.writer.VeniceWriterFactory;
Expand Down Expand Up @@ -1286,8 +1280,6 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception {
String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String storeName2 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String storeName3 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
Expand All @@ -1301,6 +1293,7 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
MetricsRepository metricsRepository = new MetricsRepository();

// Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path
StoreBackend storeBackend;
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
Expand Down Expand Up @@ -1346,32 +1339,16 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
});
}

// Test multiple client ingesting different stores concurrently
DaVinciClient<Integer, Integer> client2 = factory.getAndStartGenericAvroClient(storeName2, clientConfig);
DaVinciClient<Integer, Integer> client3 = factory.getAndStartGenericAvroClient(storeName3, clientConfig);
CompletableFuture.allOf(client2.subscribeAll(), client3.subscribeAll()).get();
assertEquals(client2.batchGet(keyValueMap.keySet()).get(), keyValueMap);
assertEquals(client3.batchGet(keyValueMap.keySet()).get(), keyValueMap);

// TODO(jlliu): Re-enable this test-case after fixing store deletion that is flaky due to
// CLIENT_USE_SYSTEM_STORE_REPOSITORY.
// // Test read from a store that is being deleted concurrently
// try (ControllerClient controllerClient = cluster.getControllerClient()) {
// ControllerResponse response = controllerClient.disableAndDeleteStore(storeName2);
// assertFalse(response.isError(), response.getError());
// TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
// assertThrows(VeniceClientException.class, () -> client2.get(KEY_COUNT / 3).get());
// });
// }
client2.unsubscribeAll();
DaVinciBackend daVinciBackend = AvroGenericDaVinciClient.getBackend();
storeBackend = daVinciBackend.getStoreOrThrow(storeName1);
List<Integer> partitions = new ArrayList<Integer>();
partitions.add(1);
partitions.add(2);
partitions.add(3);
partitions.add(4);
ComplementSet<Integer> subscription = ComplementSet.newSet(partitions);
storeBackend.subscribe(subscription);
}

// Test bootstrap-time junk removal
cluster.useControllerClient(controllerClient -> {
ControllerResponse response = controllerClient.disableAndDeleteStore(storeName3);
assertFalse(response.isError(), response.getError());
});

// Test managed clients & data cleanup
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
Expand All @@ -1384,8 +1361,6 @@ public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Excepti
DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig);
client1.subscribeAll().get();
client1.unsubscribeAll();
// client2 was removed explicitly above via disableAndDeleteStore()
// client3 is expected to be removed by the factory during bootstrap
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0);
});
Expand Down

0 comments on commit b94b49a

Please sign in to comment.