Skip to content

Commit

Permalink
Partition Removal
Browse files Browse the repository at this point in the history
  • Loading branch information
kristyelee committed Dec 5, 2024
1 parent e7bf0c7 commit 2055b04
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,10 @@ private synchronized void bootstrap() {
AbstractStorageEngine storageEngine = storageService.getStorageEngine(versionTopic);
aggVersionedStorageEngineStats.setStorageEngine(versionTopic, storageEngine);
StoreBackend storeBackend = getStoreOrThrow(storeName);
ComplementSet<Integer> persistedPartitionIds = ComplementSet.newSet(storageEngine.getPersistedPartitionIds());
storeBackend.unsubscribe(persistedPartitionIds);
ComplementSet<Integer> subscription = ComplementSet.newSet(storeBackend.getSubscription());
ComplementSet<Integer> unassignedPartitionSet = ComplementSet.newSet(storageEngine.getPersistedPartitionIds());
unassignedPartitionSet.removeAll(subscription);
storeBackend.unsubscribe(unassignedPartitionSet);
storeBackend.subscribe(ComplementSet.newSet(partitions), Optional.of(version));
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1288,7 +1288,6 @@ public void testBootstrapSubscription(DaVinciConfig daVinciConfig) throws Except

try (DaVinciClient<Integer, Object> client = ServiceFactory
.getGenericAvroDaVinciClient(storeName, cluster.getZk().getAddress(), daVinciConfig, backendConfig)) {
// We only subscribe to 1/3 of the partitions so some data will not be present locally.
client.subscribe(Collections.singleton(0)).get();
assertThrows(() -> client.batchGet(keySet).get());
}
Expand Down

0 comments on commit 2055b04

Please sign in to comment.