Skip to content

Commit

Permalink
Adding support to allow refresh interval -1
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
Shubh Sahu committed Apr 1, 2024
1 parent 618782d commit c9ce216
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
// Settings for concurrent segment search
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING,

IndexSettings.INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
24 changes: 24 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -717,6 +717,19 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

/**
* Index setting describing the maximum number of uncommitted translog files at a time.
* If breached the shard will be Refreshed, this is to control large number of tranlog files
* downloads during recovery.
*/
public static final Setting<Integer> INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES = Setting.intSetting(
"index.max_uncommitted_translog_files",
300,
1,
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -799,6 +812,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile long mappingTotalFieldsLimit;
private volatile long mappingDepthLimit;
private volatile long mappingFieldNameLengthLimit;
private volatile int maxUncommittedTranslogFiles;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -977,6 +991,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED);
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);
maxUncommittedTranslogFiles = scopedSettings.get(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES);
/* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7).
* For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type
* to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to
Expand Down Expand Up @@ -1100,6 +1115,15 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING,
this::setDocIdFuzzySetFalsePositiveProbability
);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES,this::setMaxUncommittedTranslogFiles);
}

private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) {
this.maxUncommittedTranslogFiles = maxUncommittedTranslogFiles;
}

public int getMaxUncommittedTranslogFiles() {
return maxUncommittedTranslogFiles;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down
56 changes: 56 additions & 0 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -4452,6 +4452,33 @@ public Durability getTranslogDurability() {
// we can not protect with a lock since we "release" on a different thread
private final AtomicBoolean flushOrRollRunning = new AtomicBoolean();

/**
* Tests whether or not the shard should be Refreshed, if number of translog files breaches the
* threshold count determined by {@code index.translog.max_uncommitted_files_threshold}
* @return {@code true} if the shard should be Refreshed
*/
boolean shouldRefreshShard(){
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
return engine.translogManager().shouldRefreshShard(indexSettings.getMaxUncommittedTranslogFiles());
} catch (final AlreadyClosedException e) {
// we are already closed, no need to Refresh
}
}
return false;
}

private final AtomicBoolean isRefreshRunning = new AtomicBoolean();

/**
* Will Call a blocking Refresh and then Trim the Unreferenced Translog files
*/
private void refreshAndTrimTranslogfiles(String source) throws IOException {
refresh(source);
getEngine().translogManager().trimUnreferencedTranslogFiles();
}

/**
* Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be
* executed asynchronously on the flush thread pool.
Expand Down Expand Up @@ -4516,6 +4543,35 @@ public void onAfter() {
flushOrRollRunning.compareAndSet(true, false);
}
}
} else if (shouldRefreshShard() && isRefreshRunning.compareAndSet(false, true)) {

if (shouldRefreshShard()) {
logger.info("submitting async Refresh request");
final AbstractRunnable refreshAndTrimTranslog = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
logger.warn("forced refresh failed after number of uncommited translog files breached limit", e);
}

@Override
protected void doRun() throws Exception {
refreshAndTrimTranslogfiles("Too many uncommited Translog files");
}

@Override
public boolean isForceExecution() {
return true;
}

@Override
public void onAfter() {
isRefreshRunning.compareAndSet(true, false);
}
};
threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshAndTrimTranslog);
} else {
isRefreshRunning.compareAndSet(true, false);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,9 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl
public void close() throws IOException {
IOUtils.closeWhileHandlingException(translog);
}

@Override
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
return getTranslog(true).shouldRefreshShard(maxUncommittedTranslogFilesThreshold);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,4 +134,9 @@ public Releasable drainSync() {
public Translog.TranslogGeneration getTranslogGeneration() {
return null;
}

@Override
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -626,4 +626,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
int availablePermits() {
return syncPermit.availablePermits();
}

/**
* Tests whether or not the shard should be Refreshed.
* This test is based on the number of Translog files compared to configured number of Translog files threshold
*
* @return {@code true} if the shard should be Refreshed
*/
@Override
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
return readers.size() > maxUncommittedTranslogFilesThreshold;
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/index/translog/Translog.java
Original file line number Diff line number Diff line change
Expand Up @@ -2052,4 +2052,14 @@ public static String createEmptyTranslog(
public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) {
return minUnrefCheckpointInLastCommit;
}

/**
* Tests whether or not the shard should be Refreshed.
* This test is based on the number of Translog files compared to configured number of Translog files threshold
*
* @return {@code true} if the shard should be Refreshed
*/
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,6 @@ public interface TranslogManager {
Releasable drainSync();

Translog.TranslogGeneration getTranslogGeneration();

boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold);
}

0 comments on commit c9ce216

Please sign in to comment.