Skip to content

Commit

Permalink
Fix the replication failure issues
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <[email protected]>
  • Loading branch information
Sachin Kale committed Apr 9, 2024
1 parent 84557bc commit d605254
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5054,7 +5054,11 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
.filter(file -> file.startsWith(RemoteSegmentStoreDirectory.SEGMENT_INFOS_SNAPSHOT_PREFIX))
.collect(Collectors.toList());
assert segmentInfosSnapshotFilenames.size() == 1;
infosSnapshot = SegmentInfos.readCommit(store.directory(), segmentInfosSnapshotFilenames.get(0));
infosSnapshot = SegmentInfos.readCommit(
store.directory(),
store.directory().openChecksumInput(segmentInfosSnapshotFilenames.get(0), IOContext.READ),
remoteSegmentMetadata.getGeneration()
);
} else {
infosSnapshot = store.buildSegmentInfos(
remoteSegmentMetadata.getSegmentInfosBytes(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,12 @@ public void uploadMetadata(
) {
segmentInfosSnapshot.write(segmentInfosIndexOutput);
}
copyFrom(storeDirectory, segmentInfoSnapshotFilename, segmentInfoSnapshotFilename, IOContext.DEFAULT);
remoteDataDirectory.copyFrom(
storeDirectory,
segmentInfoSnapshotFilename,
segmentInfoSnapshotFilename,
IOContext.DEFAULT
);
String segmentInfosSnapshotChecksum = getChecksumOfLocalFile(storeDirectory, segmentInfoSnapshotFilename);
UploadedSegmentMetadata segmentInfosSnapshotMetadata = new UploadedSegmentMetadata(
segmentInfoSnapshotFilename,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.RemoteSegmentStoreDirectory;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
import org.opensearch.indices.recovery.MultiFileWriter;
Expand All @@ -36,6 +37,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Set;
Expand Down Expand Up @@ -283,19 +285,31 @@ private void updateFileRecoveryBytes(String fileName, long bytesRecovered) {
private void finalizeReplication(CheckpointInfoResponse checkpointInfoResponse) throws OpenSearchCorruptionException {
cancellableThreads.checkForCancel();
state.setStage(SegmentReplicationState.Stage.FINALIZE_REPLICATION);
byte[] segmentInfosBytes = checkpointInfoResponse.getInfosBytes();
// Handle empty SegmentInfos bytes for recovering replicas
if (checkpointInfoResponse.getInfosBytes() == null) {
if (segmentInfosBytes == null) {
return;
}
Store store = null;
try {
store = store();
store.incRef();
multiFileWriter.renameAllTempFiles();
final SegmentInfos infos = store.buildSegmentInfos(
checkpointInfoResponse.getInfosBytes(),
checkpointInfoResponse.getCheckpoint().getSegmentsGen()
);
final SegmentInfos infos;
if (segmentInfosBytes.length == 0) {
List<String> segmentInfosSnapshotFilenames = Arrays.stream(store.directory().listAll())
.filter(file -> file.startsWith(RemoteSegmentStoreDirectory.SEGMENT_INFOS_SNAPSHOT_PREFIX))
.collect(Collectors.toList());
assert segmentInfosSnapshotFilenames.size() == 1;
infos = SegmentInfos.readCommit(
store.directory(),
store.directory().openChecksumInput(segmentInfosSnapshotFilenames.get(0), IOContext.READ),
checkpointInfoResponse.getCheckpoint().getSegmentsGen()
);
store.deleteQuiet(segmentInfosSnapshotFilenames.get(0));
} else {
infos = store.buildSegmentInfos(segmentInfosBytes, checkpointInfoResponse.getCheckpoint().getSegmentsGen());
}
indexShard.finalizeReplication(infos);
} catch (CorruptIndexException | IndexFormatTooNewException | IndexFormatTooOldException ex) {
// this is a fatal exception at this stage.
Expand Down

0 comments on commit d605254

Please sign in to comment.