Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[da-vinci][dvc] Dropping unassigned partitions #1332

Draft
wants to merge 20 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.davinci;

import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.PUSH_STATUS_INSTANCE_NAME_SUFFIX;
import static com.linkedin.venice.ConfigKeys.VALIDATE_VENICE_INTERNAL_SCHEMA_VERSION;
import static com.linkedin.venice.pushmonitor.ExecutionStatus.DVC_INGESTION_ERROR_DISK_FULL;
Expand Down Expand Up @@ -207,6 +208,7 @@ public DaVinciBackend(
String pid = Utils.getPid();
String instanceSuffix =
configLoader.getCombinedProperties().getString(PUSH_STATUS_INSTANCE_NAME_SUFFIX, (pid == null ? "NA" : pid));
// Current instance name.
String instanceName = Utils.getHostName() + "_" + instanceSuffix;

// Fetch latest update schema's protocol ID for Push Status Store from Router.
Expand Down Expand Up @@ -466,16 +468,18 @@ private synchronized void bootstrap() {
configLoader.getVeniceServerConfig());
ingestionBackend.addIngestionNotifier(ingestionListener);

// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
if (configLoader.getCombinedProperties().getBoolean(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)) {
// Subscribe all bootstrap version partitions.
storeNameToBootstrapVersionMap.forEach((storeName, version) -> {
List<Integer> partitions = storeNameToPartitionListMap.get(storeName);
String versionTopic = version.kafkaTopicName();
LOGGER.info("Bootstrapping partitions {} for {}", partitions, versionTopic);
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public StoreBackendStats getStats() {
return stats;
}

public ComplementSet<Integer> getSubscription() {
return subscription;
}

public ReferenceCounted<VersionBackend> getDaVinciCurrentVersion() {
return daVinciCurrentVersionRef.get();
}
Expand Down Expand Up @@ -154,6 +158,7 @@ synchronized CompletableFuture<Void> subscribe(
// Recreate store config that was potentially deleted by unsubscribe.
config.store();
}

subscription.addAll(partitions);

if (daVinciFutureVersion == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_LEVEL0_STOPS_WRITES_TRIGGER_WRITE_ONLY_VERSION;
import static com.linkedin.davinci.store.rocksdb.RocksDBServerConfig.ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED;
import static com.linkedin.venice.ConfigKeys.CLUSTER_NAME;
import static com.linkedin.venice.ConfigKeys.DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY;
import static com.linkedin.venice.ConfigKeys.INGESTION_MEMORY_LIMIT;
import static com.linkedin.venice.ConfigKeys.INGESTION_USE_DA_VINCI_CLIENT;
import static com.linkedin.venice.ConfigKeys.KAFKA_ADMIN_CLASS;
Expand Down Expand Up @@ -710,6 +711,7 @@ private VeniceConfigLoader buildVeniceConfig() {
.put(CLUSTER_NAME, clusterName)
.put(ZOOKEEPER_ADDRESS, zkAddress)
.put(KAFKA_BOOTSTRAP_SERVERS, kafkaBootstrapServers)
.put(DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY, true)
.put(ROCKSDB_PLAIN_TABLE_FORMAT_ENABLED, daVinciConfig.getStorageClass() == StorageClass.MEMORY_BACKED_BY_DISK)
.put(INGESTION_USE_DA_VINCI_CLIENT, true)
.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ private ConfigKeys() {
public static final String DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_QUOTA_BYTES_PER_SECOND =
"da.vinci.current.version.bootstrapping.quota.bytes.per.second";

// On Da Vinci Client, control over automatic partition subscription.
public static final String DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY =
"da.vinci.subscribe.on.disk.partitions.automatically";

// Unordered throttlers aren't compatible with Shared Kafka Consumer and have no effect when Shared Consumer is used.
public static final String KAFKA_FETCH_QUOTA_UNORDERED_BYTES_PER_SECOND =
"kafka.fetch.quota.unordered.bytes.per.second";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,94 @@ public void testBootstrap(DaVinciConfig daVinciConfig) throws Exception {
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testBootstrapSubscription(DaVinciConfig clientConfig) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since DA_VINCI_SUBSCRIBE_ON_DISK_PARTITIONS_AUTOMATICALLY is already true by default, we should instead write a test where it's set to false.

String storeName1 = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
String baseDataPath = Utils.getTempDataDirectory().getAbsolutePath();
VeniceProperties backendConfig = new PropertyBuilder().put(CLIENT_USE_SYSTEM_STORE_REPOSITORY, true)
.put(CLIENT_SYSTEM_STORE_REPOSITORY_REFRESH_INTERVAL_SECONDS, 1)
.put(DATA_BASE_PATH, baseDataPath)
.put(PERSISTENCE_TYPE, ROCKS_DB)
.put(DA_VINCI_CURRENT_VERSION_BOOTSTRAPPING_SPEEDUP_ENABLED, true)
.put(PUSH_STATUS_STORE_ENABLED, true)
.put(DAVINCI_PUSH_STATUS_CHECK_INTERVAL_IN_MS, 1000)
.build();

MetricsRepository metricsRepository = new MetricsRepository();

// Test multiple clients sharing the same ClientConfig/MetricsRepository & base data path
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
metricsRepository,
backendConfig)) {
DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig);

// Test non-existent key access
client1.subscribeAll().get();
assertNull(client1.get(KEY_COUNT + 1).get());

// Test single-get access
Map<Integer, Integer> keyValueMap = new HashMap<>();
for (int k = 0; k < KEY_COUNT; ++k) {
assertEquals(client1.get(k).get(), 1);
keyValueMap.put(k, 1);
}

// Test batch-get access
assertEquals(client1.batchGet(keyValueMap.keySet()).get(), keyValueMap);

// Test automatic new version ingestion
for (int i = 0; i < 2; ++i) {
// Test per-version partitioning parameters
int partitionCount = i + 1;
String iString = String.valueOf(i);
cluster.useControllerClient(controllerClient -> {
ControllerResponse response = controllerClient.updateStore(
storeName1,
new UpdateStoreQueryParams().setPartitionerClass(ConstantVenicePartitioner.class.getName())
.setPartitionCount(partitionCount)
.setPartitionerParams(
Collections.singletonMap(ConstantVenicePartitioner.CONSTANT_PARTITION, iString)));
assertFalse(response.isError(), response.getError());
});

Integer expectedValue = cluster.createVersion(storeName1, KEY_COUNT);
TestUtils.waitForNonDeterministicAssertion(TEST_TIMEOUT, TimeUnit.MILLISECONDS, () -> {
for (int k = 0; k < KEY_COUNT; ++k) {
Object readValue = client1.get(k).get();
assertEquals(readValue, expectedValue);
}
});
}
}

// Test managed clients & data cleanup
try (CachingDaVinciClientFactory factory = new CachingDaVinciClientFactory(
d2Client,
VeniceRouterWrapper.CLUSTER_DISCOVERY_D2_SERVICE_NAME,
new MetricsRepository(),
backendConfig,
Optional.of(Collections.singleton(storeName1)))) {
assertNotEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0);

DaVinciClient<Integer, Object> client1 = factory.getAndStartGenericAvroClient(storeName1, clientConfig);

Set<Integer> partitions = new HashSet<>();
for (int i = 0; i < 5; ++i) {
partitions.add(i);
}

client1.subscribe(partitions);

client1.subscribeAll().get();
client1.unsubscribeAll();
TestUtils.waitForNonDeterministicAssertion(60, TimeUnit.SECONDS, true, () -> {
assertEquals(FileUtils.sizeOfDirectory(new File(baseDataPath)), 0);
});
}
}

@Test(timeOut = TEST_TIMEOUT, dataProvider = "dv-client-config-provider", dataProviderClass = DataProviderUtils.class)
public void testPartialSubscription(DaVinciConfig daVinciConfig) throws Exception {
String storeName = createStoreWithMetaSystemStoreAndPushStatusSystemStore(KEY_COUNT);
Expand Down
Loading