Skip to content

Commit

Permalink
Added cache for file blobs that are already TTL removed to avoid repe…
Browse files Browse the repository at this point in the history
…ated calls
  • Loading branch information
Shekhars committed Sep 25, 2023
1 parent 24e530d commit 388e803
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.ImmutableMap;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -90,6 +92,9 @@ public class BlobStoreUtil {
private final SnapshotIndexSerde snapshotIndexSerde;
private final RetryPolicyConfig retryPolicyConfig;

// LRU cache for blob ids that were TTL removed to avoid repeated removeTTL calls.
private final Cache<String, Object> ttlRemovedBlobIdsCache = CacheBuilder.newBuilder().maximumSize(1000).build();

public BlobStoreUtil(BlobStoreManager blobStoreManager, ExecutorService executor, BlobStoreConfig blobStoreConfig,
BlobStoreBackupManagerMetrics backupMetrics, BlobStoreRestoreManagerMetrics restoreMetrics) {
this.blobStoreManager = blobStoreManager;
Expand Down Expand Up @@ -651,7 +656,8 @@ public CompletionStage<Void> removeTTL(String indexBlobId, SnapshotIndex snapsho
* @return A future that completes when all the blobs associated with this dirIndex are marked to
* never expire.
*/
private CompletableFuture<Void> removeTTL(DirIndex dirIndex, Metadata metadata) {
@VisibleForTesting
CompletableFuture<Void> removeTTL(DirIndex dirIndex, Metadata metadata) {
String dirName = dirIndex.getDirName();
if (DirIndex.ROOT_DIR_NAME.equals(dirName)) {
LOG.debug("Removing TTL for files and dirs present in DirIndex for root dir.");
Expand All @@ -670,12 +676,18 @@ private CompletableFuture<Void> removeTTL(DirIndex dirIndex, Metadata metadata)
metadata.getJobName(), metadata.getJobId(), metadata.getTaskName(), metadata.getStoreName());
List<FileBlob> fileBlobs = file.getBlobs();
for (FileBlob fileBlob : fileBlobs) {
String opname = "removeTTL for fileBlob: " + file.getFileName() + " with blobId: {}" + fileBlob.getBlobId();
Supplier<CompletionStage<Void>> ttlRemovalAction = () ->
blobStoreManager.removeTTL(fileBlob.getBlobId(), requestMetadata).toCompletableFuture();
CompletableFuture<Void> ttlRemovalFuture =
FutureUtil.executeAsyncWithRetries(opname, ttlRemovalAction, isCauseNonRetriable(), executor, retryPolicyConfig);
updateTTLsFuture.add(ttlRemovalFuture);
if (ttlRemovedBlobIdsCache.getIfPresent(fileBlob.getBlobId()) == null) {
String opname = "removeTTL for fileBlob: " + file.getFileName() + " with blobId: {}" + fileBlob.getBlobId();
Supplier<CompletionStage<Void>> ttlRemovalAction = () ->
blobStoreManager.removeTTL(fileBlob.getBlobId(), requestMetadata).toCompletableFuture()
.thenRun(() -> ttlRemovedBlobIdsCache.put(fileBlob.getBlobId(), new Object()));
CompletableFuture<Void> ttlRemovalFuture =
FutureUtil.executeAsyncWithRetries(opname, ttlRemovalAction, isCauseNonRetriable(), executor, retryPolicyConfig);

updateTTLsFuture.add(ttlRemovalFuture);
} else {
LOG.debug("TTL already removed for blobId: {}. Not removing again", fileBlob.getBlobId());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1034,6 +1034,25 @@ public void testPutFileRetriedMoreThanThreeTimes() throws Exception {
verify(blobStoreManager, times(4)).put(any(InputStream.class), any(Metadata.class));
}

@Test
public void testBlobTTLRemovedOnlyOnce() {
Metadata metadata = new Metadata("test", Optional.of(100L), "job", "jobId", "task", "store");
BlobStoreManager blobStoreManager = mock(BlobStoreManager.class);
when(blobStoreManager.removeTTL(any(String.class), any(Metadata.class)))
.thenReturn(CompletableFuture.completedFuture(null));
BlobStoreUtil blobStoreUtil =
new BlobStoreUtil(blobStoreManager, EXECUTOR, blobStoreConfig, null, null);
FileBlob fileBlob1 = new FileBlob("test", 0);
FileIndex fileIndex1 = new FileIndex("mock1", Collections.singletonList(fileBlob1), new FileMetadata(0, 0, 10, "root", "root", "666"), 0);
FileBlob fileBlob2 = new FileBlob("test", 0);
FileIndex fileIndex2 = new FileIndex("mock2", Collections.singletonList(fileBlob2), new FileMetadata(0, 0, 10, "root", "root", "666"), 0);
DirIndex dirIndex1 = new DirIndex("root", Collections.singletonList(fileIndex1), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
DirIndex dirIndex2 = new DirIndex("root", Collections.singletonList(fileIndex2), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
blobStoreUtil.removeTTL(dirIndex1, metadata);
blobStoreUtil.removeTTL(dirIndex2, metadata);
verify(blobStoreManager, times(1)).removeTTL(any(String.class), any(Metadata.class));
}

private CheckpointV2 createCheckpointV2(String stateBackendFactory, Map<String, String> storeSnapshotIndexBlobIds) {
CheckpointId checkpointId = CheckpointId.create();
Map<String, Map<String, String>> factoryStoreSCMs = new HashMap<>();
Expand Down

0 comments on commit 388e803

Please sign in to comment.