Skip to content

Commit

Permalink
[Remote Store] Cleanup local-only translog files if no metadata in re…
Browse files Browse the repository at this point in the history
…mote (#12691) (#13261)

Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
sachinpkale authored Apr 18, 2024
1 parent a14ec07 commit a3f303b
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.indices.recovery.RecoverySettings;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -72,7 +73,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(MockTransportService.TestPlugin.class);
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class);
}

@Override
Expand Down Expand Up @@ -803,4 +804,71 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
docs + moreDocs + uncommittedOps
);
}

// Test local only translog files which are not uploaded to remote store (no metadata present in remote)
// Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE.
public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
clusterSettingsSuppliedByTest = true;

// Overriding settings to use AsyncMultiStreamBlobContainer
Settings settings = Settings.builder()
.put(super.nodeSettings(1))
.put(
remoteStoreClusterSettings(
REPOSITORY_NAME,
segmentRepoPath,
MockFsRepositoryPlugin.TYPE,
REPOSITORY_2_NAME,
translogRepoPath,
MockFsRepositoryPlugin.TYPE
)
)
.build();

internalCluster().startClusterManagerOnlyNode(settings);
String dataNode = internalCluster().startDataOnlyNode(settings);

// 1. Create index with 0 replica
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

// 2. Index docs
int searchableDocs = 0;
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexBulk(INDEX_NAME, 15);
refresh(INDEX_NAME);
searchableDocs += 15;
}
indexBulk(INDEX_NAME, 15);

assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs);

// 3. Delete metadata from remote translog
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);

Path translogMetaDataPath = Path.of(String.valueOf(translogRepoPath), indexUUID, "/0/translog/metadata");

try (Stream<Path> files = Files.list(translogMetaDataPath)) {
files.forEach(p -> {
try {
Files.delete(p);
} catch (IOException e) {
// Ignore
}
});
}

internalCluster().restartNode(dataNode);

ensureGreen(INDEX_NAME);

assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs);
indexBulk(INDEX_NAME, 15);
refresh(INDEX_NAME);
assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.core.util.FileSystemUtils;
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
import org.opensearch.index.translog.transfer.FileTransferTracker;
import org.opensearch.index.translog.transfer.TransferSnapshot;
Expand Down Expand Up @@ -200,7 +201,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
throw ex;
}

static private void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
logger.debug("Downloading translog files from remote");
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
Expand Down Expand Up @@ -235,10 +236,32 @@ static private void downloadOnce(TranslogTransferManager translogTransferManager
location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())),
location.resolve(Translog.CHECKPOINT_FILE_NAME)
);
} else {
// When code flow reaches this block, it means we don't have any translog files uploaded to remote store.
// If local filesystem contains empty translog or no translog, we don't do anything.
// If local filesystem contains non-empty translog, we clean up these files and create empty translog.
logger.debug("No translog files found on remote, checking local filesystem for cleanup");
if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) {
final Checkpoint checkpoint = readCheckpoint(location);
if (isEmptyTranslog(checkpoint) == false) {
logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files");
// Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete
Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint);
} else {
logger.debug("Empty translog on local, skipping clean-up");
}
}
}
logger.debug("downloadOnce execution completed");
}

private static boolean isEmptyTranslog(Checkpoint checkpoint) {
return checkpoint.generation == checkpoint.minTranslogGeneration
&& checkpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED
&& checkpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED
&& checkpoint.numOps == 0;
}

public static TranslogTransferManager buildTranslogTransferManager(
BlobStoreRepository blobStoreRepository,
ThreadPool threadPool,
Expand Down
38 changes: 34 additions & 4 deletions server/src/main/java/org/opensearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -2011,17 +2011,47 @@ public static String createEmptyTranslog(
final long primaryTerm,
@Nullable final String translogUUID,
@Nullable final ChannelFactory factory
) throws IOException {
return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, translogUUID, factory, 1);
}

public static String createEmptyTranslog(final Path location, final ShardId shardId, Checkpoint checkpoint) throws IOException {
final Path highestGenTranslogFile = location.resolve(getFilename(checkpoint.generation));
final TranslogHeader translogHeader;
try (FileChannel channel = FileChannel.open(highestGenTranslogFile, StandardOpenOption.READ)) {
translogHeader = TranslogHeader.read(highestGenTranslogFile, channel);
}
final String translogUUID = translogHeader.getTranslogUUID();
final long primaryTerm = translogHeader.getPrimaryTerm();
final ChannelFactory channelFactory = FileChannel::open;
return Translog.createEmptyTranslog(
location,
shardId,
SequenceNumbers.NO_OPS_PERFORMED,
primaryTerm,
translogUUID,
channelFactory,
checkpoint.generation + 1
);
}

public static String createEmptyTranslog(
final Path location,
final ShardId shardId,
final long initialGlobalCheckpoint,
final long primaryTerm,
@Nullable final String translogUUID,
@Nullable final ChannelFactory factory,
final long generation
) throws IOException {
IOUtils.rm(location);
Files.createDirectories(location);

final long generation = 1L;
final long minTranslogGeneration = 1L;
final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open;
final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID();
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
final Path translogFile = location.resolve(getFilename(generation));
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration);
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, generation);

Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
final TranslogWriter writer = TranslogWriter.create(
Expand All @@ -2031,7 +2061,7 @@ public static String createEmptyTranslog(
translogFile,
channelFactory,
EMPTY_TRANSLOG_BUFFER_SIZE,
minTranslogGeneration,
generation,
initialGlobalCheckpoint,
() -> {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1711,6 +1711,82 @@ public void testDownloadWithRetries() throws IOException {
RemoteFsTranslog.download(mockTransfer, location, logger);
}

// No translog data in local as well as remote, we skip creating empty translog
public void testDownloadWithNoTranslogInLocalAndRemote() throws IOException {
Path location = createTempDir();

TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata()).thenReturn(null);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

Path[] filesBeforeDownload = FileSystemUtils.files(location);
RemoteFsTranslog.download(mockTransfer, location, logger);
assertEquals(filesBeforeDownload, FileSystemUtils.files(location));
}

// No translog data in remote but non-empty translog is present in local. In this case, we delete all the files
// from local file system and create empty translog
public void testDownloadWithTranslogOnlyInLocal() throws IOException {
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata()).thenReturn(null);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

Path location = createTempDir();
for (Path file : FileSystemUtils.files(translogDir)) {
Files.copy(file, location.resolve(file.getFileName()));
}

Checkpoint existingCheckpoint = Translog.readCheckpoint(location);

TranslogTransferManager finalMockTransfer = mockTransfer;
RemoteFsTranslog.download(finalMockTransfer, location, logger);

Path[] filesPostDownload = FileSystemUtils.files(location);
assertEquals(2, filesPostDownload.length);
assertTrue(
filesPostDownload[0].getFileName().toString().contains("translog.ckp")
|| filesPostDownload[1].getFileName().toString().contains("translog.ckp")
);

Checkpoint newEmptyTranslogCheckpoint = Translog.readCheckpoint(location);
// Verify that the new checkpoint points to empty translog
assertTrue(
newEmptyTranslogCheckpoint.generation == newEmptyTranslogCheckpoint.minTranslogGeneration
&& newEmptyTranslogCheckpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED
&& newEmptyTranslogCheckpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED
&& newEmptyTranslogCheckpoint.numOps == 0
);
assertTrue(newEmptyTranslogCheckpoint.generation > existingCheckpoint.generation);
assertEquals(newEmptyTranslogCheckpoint.globalCheckpoint, existingCheckpoint.globalCheckpoint);
}

// No translog data in remote and empty translog in local. We skip creating another empty translog
public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
when(mockTransfer.readMetadata()).thenReturn(null);
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);

Path location = createTempDir();
for (Path file : FileSystemUtils.files(translogDir)) {
Files.copy(file, location.resolve(file.getFileName()));
}

TranslogTransferManager finalMockTransfer = mockTransfer;

// download first time will ensure creating empty translog
RemoteFsTranslog.download(finalMockTransfer, location, logger);
Path[] filesPostFirstDownload = FileSystemUtils.files(location);

// download on empty translog should be a no-op
RemoteFsTranslog.download(finalMockTransfer, location, logger);
Path[] filesPostSecondDownload = FileSystemUtils.files(location);

assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
}

public class ThrowingBlobRepository extends FsRepository {

private final Environment environment;
Expand Down

0 comments on commit a3f303b

Please sign in to comment.