Skip to content

Commit

Permalink
Optimize remote store GC flow with pinned timestamps (#15943)
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored Sep 25, 2024
1 parent 12dadcf commit dc4dbce
Show file tree
Hide file tree
Showing 11 changed files with 854 additions and 227 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}

// Check last fetch status of pinned timestamps. If stale, return.
if (RemoteStoreUtils.isPinnedTimestampStateStale()) {
if (lastNMetadataFilesToKeep != 0 && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
return;
}
Expand Down Expand Up @@ -994,7 +994,8 @@ public static void remoteDirectoryCleanup(
String remoteStoreRepoForIndex,
String indexUUID,
ShardId shardId,
RemoteStorePathStrategy pathStrategy
RemoteStorePathStrategy pathStrategy,
boolean forceClean
) {
try {
RemoteSegmentStoreDirectory remoteSegmentStoreDirectory = (RemoteSegmentStoreDirectory) remoteDirectoryFactory.newDirectory(
Expand All @@ -1003,8 +1004,12 @@ public static void remoteDirectoryCleanup(
shardId,
pathStrategy
);
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
if (forceClean) {
remoteSegmentStoreDirectory.delete();
} else {
remoteSegmentStoreDirectory.deleteStaleSegments(0);
remoteSegmentStoreDirectory.deleteIfEmpty();
}
} catch (Exception e) {
staticLogger.error("Exception occurred while deleting directory", e);
}
Expand All @@ -1023,7 +1028,10 @@ private boolean deleteIfEmpty() throws IOException {
logger.info("Remote directory still has files, not deleting the path");
return false;
}
return delete();
}

private boolean delete() {
try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BooleanSupplier;
import java.util.function.LongConsumer;
Expand Down Expand Up @@ -61,6 +62,7 @@ public class RemoteFsTimestampAwareTranslog extends RemoteFsTranslog {
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFileGenerationMap;
private final Map<String, Tuple<Long, Long>> oldFormatMetadataFilePrimaryTermMap;
private final AtomicLong minPrimaryTermInRemote = new AtomicLong(Long.MAX_VALUE);
private final AtomicBoolean triggerTrimOnMinRemoteGenReferencedChange = new AtomicBoolean(false);

public RemoteFsTimestampAwareTranslog(
TranslogConfig config,
Expand Down Expand Up @@ -105,6 +107,11 @@ protected void onDelete() {
}
}

@Override
protected void onMinRemoteGenReferencedChange() {
triggerTrimOnMinRemoteGenReferencedChange.set(true);
}

@Override
public void trimUnreferencedReaders() throws IOException {
trimUnreferencedReaders(false, true);
Expand Down Expand Up @@ -135,14 +142,22 @@ protected void trimUnreferencedReaders(boolean indexDeleted, boolean trimLocal)

// This is to ensure that after the permits are acquired during primary relocation, there are no further modification on remote
// store.
if (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get()) {
if (indexDeleted == false && (startedPrimarySupplier.getAsBoolean() == false || pauseSync.get())) {
return;
}

// This is to fail fast and avoid listing md files un-necessarily.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
return;
}

// This code block ensures parity with RemoteFsTranslog. Without this, we will end up making list translog metadata
// call in each invocation of trimUnreferencedReaders
if (indexDeleted == false && triggerTrimOnMinRemoteGenReferencedChange.get() == false) {
return;
} else if (triggerTrimOnMinRemoteGenReferencedChange.get()) {
triggerTrimOnMinRemoteGenReferencedChange.set(false);
}

// Since remote generation deletion is async, this ensures that only one generation deletion happens at a time.
Expand All @@ -158,24 +173,20 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.size() <= 1) {
if (indexDeleted == false && metadataFiles.size() <= 1) {
logger.debug("No stale translog metadata files found");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}

// Check last fetch status of pinned timestamps. If stale, return.
if (indexDeleted == false && RemoteStoreUtils.isPinnedTimestampStateStale()) {
logger.warn("Skipping remote segment store garbage collection as last fetch of pinned timestamp is stale");
logger.warn("Skipping remote translog garbage collection as last fetch of pinned timestamp is stale");
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
return;
}

List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
logger
);
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, indexDeleted);

// If index is not deleted, make sure to keep latest metadata file
if (indexDeleted == false) {
Expand All @@ -194,10 +205,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);

logger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

Set<Long> generationsToBeDeleted = getGenerationsToBeDeleted(
metadataFilesNotToBeDeleted,
metadataFilesToBeDeleted,
indexDeleted
indexDeleted ? Long.MAX_VALUE : getMinGenerationToKeepInRemote()
);

logger.debug(() -> "generationsToBeDeleted = " + generationsToBeDeleted);
Expand All @@ -208,7 +220,11 @@ public void onResponse(List<BlobMetadata> blobMetadata) {
generationsToBeDeleted,
remoteGenerationDeletionPermits::release
);
} else {
remoteGenerationDeletionPermits.release();
}

if (metadataFilesToBeDeleted.isEmpty() == false) {
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(
metadataFilesToBeDeleted,
Expand All @@ -217,11 +233,10 @@ public void onResponse(List<BlobMetadata> blobMetadata) {

// Update cache to keep only those metadata files that are not getting deleted
oldFormatMetadataFileGenerationMap.keySet().retainAll(metadataFilesNotToBeDeleted);

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(metadataFilesNotToBeDeleted);
} else {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
remoteGenerationDeletionPermits.release();
}
} catch (Exception e) {
remoteGenerationDeletionPermits.release(REMOTE_DELETION_PERMITS);
Expand All @@ -237,18 +252,16 @@ public void onFailure(Exception e) {
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}

private long getMinGenerationToKeepInRemote() {
return minRemoteGenReferenced - indexSettings().getRemoteTranslogExtraKeep();
}

// Visible for testing
protected Set<Long> getGenerationsToBeDeleted(
List<String> metadataFilesNotToBeDeleted,
List<String> metadataFilesToBeDeleted,
boolean indexDeleted
long minGenerationToKeepInRemote
) throws IOException {
long maxGenerationToBeDeleted = Long.MAX_VALUE;

if (indexDeleted == false) {
maxGenerationToBeDeleted = minRemoteGenReferenced - 1 - indexSettings().getRemoteTranslogExtraKeep();
}

Set<Long> generationsFromMetadataFilesToBeDeleted = new HashSet<>();
for (String mdFile : metadataFilesToBeDeleted) {
Tuple<Long, Long> minMaxGen = getMinMaxTranslogGenerationFromMetadataFile(mdFile, translogTransferManager);
Expand All @@ -262,21 +275,31 @@ protected Set<Long> getGenerationsToBeDeleted(
Set<Long> generationsToBeDeleted = new HashSet<>();
for (long generation : generationsFromMetadataFilesToBeDeleted) {
// Check if the generation is not referred by metadata file matching pinned timestamps
if (generation <= maxGenerationToBeDeleted && isGenerationPinned(generation, pinnedGenerations) == false) {
// The check with minGenerationToKeep is redundant but kept as to make sure we don't delete generations
// that are not persisted in remote segment store yet.
if (generation < minGenerationToKeepInRemote && isGenerationPinned(generation, pinnedGenerations) == false) {
generationsToBeDeleted.add(generation);
}
}
return generationsToBeDeleted;
}

protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles) {
return getMetadataFilesToBeDeleted(metadataFiles, metadataFilePinnedTimestampMap, logger);
protected List<String> getMetadataFilesToBeDeleted(List<String> metadataFiles, boolean indexDeleted) {
return getMetadataFilesToBeDeleted(
metadataFiles,
metadataFilePinnedTimestampMap,
getMinGenerationToKeepInRemote(),
indexDeleted,
logger
);
}

// Visible for testing
protected static List<String> getMetadataFilesToBeDeleted(
List<String> metadataFiles,
Map<Long, String> metadataFilePinnedTimestampMap,
long minGenerationToKeepInRemote,
boolean indexDeleted,
Logger logger
) {
Tuple<Long, Set<Long>> pinnedTimestampsState = RemoteStorePinnedTimestampService.getPinnedTimestamps();
Expand Down Expand Up @@ -312,6 +335,22 @@ protected static List<String> getMetadataFilesToBeDeleted(
metadataFilesToBeDeleted.size()
);

if (indexDeleted == false) {
// Filter out metadata files based on minGenerationToKeep
List<String> metadataFilesContainingMinGenerationToKeep = metadataFilesToBeDeleted.stream().filter(md -> {
long maxGeneration = TranslogTransferMetadata.getMaxGenerationFromFileName(md);
return maxGeneration == -1 || maxGeneration >= minGenerationToKeepInRemote;
}).collect(Collectors.toList());
metadataFilesToBeDeleted.removeAll(metadataFilesContainingMinGenerationToKeep);

logger.trace(
"metadataFilesContainingMinGenerationToKeep.size = {}, metadataFilesToBeDeleted based on minGenerationToKeep filtering = {}, minGenerationToKeep = {}",
metadataFilesContainingMinGenerationToKeep.size(),
metadataFilesToBeDeleted.size(),
minGenerationToKeepInRemote
);
}

return metadataFilesToBeDeleted;
}

Expand Down Expand Up @@ -472,50 +511,60 @@ protected static Tuple<Long, Long> getMinMaxPrimaryTermFromMetadataFile(
}
}

public static void cleanup(TranslogTransferManager translogTransferManager) throws IOException {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());
public static void cleanupOfDeletedIndex(TranslogTransferManager translogTransferManager, boolean forceClean) throws IOException {
if (forceClean) {
translogTransferManager.delete();
} else {
ActionListener<List<BlobMetadata>> listMetadataFilesListener = new ActionListener<>() {
@Override
public void onResponse(List<BlobMetadata> blobMetadata) {
List<String> metadataFiles = blobMetadata.stream().map(BlobMetadata::name).collect(Collectors.toList());

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(
metadataFiles,
new HashMap<>(),
Long.MAX_VALUE,
true, // This method gets called when the index is no longer present
staticLogger
);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);

try {
if (metadataFiles.isEmpty()) {
staticLogger.debug("No stale translog metadata files found");
return;
}
List<String> metadataFilesToBeDeleted = getMetadataFilesToBeDeleted(metadataFiles, new HashMap<>(), staticLogger);
if (metadataFilesToBeDeleted.isEmpty()) {
staticLogger.debug("No metadata files to delete");
return;
}
staticLogger.debug(() -> "metadataFilesToBeDeleted = " + metadataFilesToBeDeleted);
// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);

// For all the files that we are keeping, fetch min and max generations
List<String> metadataFilesNotToBeDeleted = new ArrayList<>(metadataFiles);
metadataFilesNotToBeDeleted.removeAll(metadataFilesToBeDeleted);
staticLogger.debug(() -> "metadataFilesNotToBeDeleted = " + metadataFilesNotToBeDeleted);
// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});

// Delete stale metadata files
translogTransferManager.deleteMetadataFilesAsync(metadataFilesToBeDeleted, () -> {});
// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

// Delete stale primary terms
deleteStaleRemotePrimaryTerms(
metadataFilesNotToBeDeleted,
translogTransferManager,
new HashMap<>(),
new AtomicLong(Long.MAX_VALUE),
staticLogger
);
} catch (Exception e) {
@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
}

@Override
public void onFailure(Exception e) {
staticLogger.error("Exception while cleaning up metadata and primary terms", e);
}
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
};
translogTransferManager.listTranslogMetadataFilesAsync(listMetadataFilesListener);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -683,12 +683,17 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen
@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
long previousMinRemoteGenReferenced = minRemoteGenReferenced;
minRemoteGenReferenced = getMinFileGeneration();
if (previousMinRemoteGenReferenced != minRemoteGenReferenced) {
onMinRemoteGenReferencedChange();
}
logger.debug(
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}",
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}, minRemoteGenReferenced = {}",
primaryTerm,
generation,
maxSeqNo
maxSeqNo,
minRemoteGenReferenced
);
}

Expand All @@ -702,6 +707,10 @@ public void onUploadFailed(TransferSnapshot transferSnapshot, Exception ex) thro
}
}

protected void onMinRemoteGenReferencedChange() {

}

@Override
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minSeqNoToKeep;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,16 @@ public static Tuple<Long, Long> getMinMaxTranslogGenerationFromFilename(String f
}
}

public static long getMaxGenerationFromFileName(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
try {
return RemoteStoreUtils.invertLong(tokens[2]);
} catch (Exception e) {
logger.error(() -> new ParameterizedMessage("Exception while getting max generation from: {}", filename), e);
return -1;
}
}

public static Tuple<Long, Long> getMinMaxPrimaryTermFromFilename(String filename) {
String[] tokens = filename.split(METADATA_SEPARATOR);
if (tokens.length < 7) {
Expand Down
Loading

0 comments on commit dc4dbce

Please sign in to comment.