diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index c6c312d6b6eea..29ba5d7504e26 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -237,6 +237,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings { // Settings for concurrent segment search IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING, + IndexSettings.INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, + // validate that built-in similarities don't get redefined Setting.groupSetting("index.similarity.", (s) -> { Map groups = s.getAsGroups(); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 82875564c1c07..16b43c65c1718 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -719,6 +719,19 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + /** + * Index setting describing the maximum number of uncommitted translog files at a time. + * If breached the shard will be Refreshed, this is to control large number of tranlog files + * downloads during recovery. + */ + public static final Setting INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES = Setting.intSetting( + "index.max_uncommitted_translog_files", + 300, + 1, + Property.Dynamic, + Property.IndexScope + ); + private final Index index; private final Version version; private final Logger logger; @@ -802,6 +815,7 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { private volatile long mappingTotalFieldsLimit; private volatile long mappingDepthLimit; private volatile long mappingFieldNameLengthLimit; + private volatile int maxUncommittedTranslogFiles; /** * The maximum number of refresh listeners allows on this shard. @@ -980,6 +994,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED); defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); + maxUncommittedTranslogFiles = scopedSettings.get(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type * to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to @@ -1104,6 +1119,15 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING, this::setDocIdFuzzySetFalsePositiveProbability ); + scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES,this::setMaxUncommittedTranslogFiles); + } + + private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) { + this.maxUncommittedTranslogFiles = maxUncommittedTranslogFiles; + } + + public int getMaxUncommittedTranslogFiles() { + return maxUncommittedTranslogFiles; } private void setSearchIdleAfter(TimeValue searchIdleAfter) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 26dbbbcdee7c0..e0202b7404a88 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -4484,6 +4484,33 @@ public Durability getTranslogDurability() { // we can not protect with a lock since we "release" on a different thread private final AtomicBoolean flushOrRollRunning = new AtomicBoolean(); + /** + * Tests whether or not the shard should be Refreshed, if number of translog files breaches the + * threshold count determined by {@code index.translog.max_uncommitted_files_threshold} + * @return {@code true} if the shard should be Refreshed + */ + boolean shouldRefreshShard(){ + final Engine engine = getEngineOrNull(); + if (engine != null) { + try { + return engine.translogManager().shouldRefreshShard(indexSettings.getMaxUncommittedTranslogFiles()); + } catch (final AlreadyClosedException e) { + // we are already closed, no need to Refresh + } + } + return false; + } + + private final AtomicBoolean isRefreshRunning = new AtomicBoolean(); + + /** + * Will Call a blocking Refresh and then Trim the Unreferenced Translog files + */ + private void refreshAndTrimTranslogfiles(String source) throws IOException { + refresh(source); + getEngine().translogManager().trimUnreferencedTranslogFiles(); + } + /** * Schedules a flush or translog generation roll if needed but will not schedule more than one concurrently. The operation will be * executed asynchronously on the flush thread pool. @@ -4548,6 +4575,35 @@ public void onAfter() { flushOrRollRunning.compareAndSet(true, false); } } + } else if (shouldRefreshShard() && isRefreshRunning.compareAndSet(false, true)) { + + if (shouldRefreshShard()) { + logger.info("submitting async Refresh request"); + final AbstractRunnable refreshAndTrimTranslog = new AbstractRunnable() { + @Override + public void onFailure(Exception e) { + logger.warn("forced refresh failed after number of uncommited translog files breached limit", e); + } + + @Override + protected void doRun() throws Exception { + refreshAndTrimTranslogfiles("Too many uncommited Translog files"); + } + + @Override + public boolean isForceExecution() { + return true; + } + + @Override + public void onAfter() { + isRefreshRunning.compareAndSet(true, false); + } + }; + threadPool.executor(ThreadPool.Names.REFRESH).execute(refreshAndTrimTranslog); + } else { + isRefreshRunning.compareAndSet(true, false); + } } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index a22c538286a88..34044b45bf859 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -469,4 +469,9 @@ public boolean shouldPeriodicallyFlush(long localCheckpointOfLastCommit, long fl public void close() throws IOException { IOUtils.closeWhileHandlingException(translog); } + + @Override + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return getTranslog(true).shouldRefreshShard(maxUncommittedTranslogFilesThreshold); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index b4aa7865570a6..d918b0edca120 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -134,4 +134,9 @@ public Releasable drainSync() { public Translog.TranslogGeneration getTranslogGeneration() { return null; } + + @Override + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return false; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 67799f0465c29..6154cb3454699 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -652,4 +652,15 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi int availablePermits() { return syncPermit.availablePermits(); } + + /** + * Tests whether or not the shard should be Refreshed. + * This test is based on the number of Translog files compared to configured number of Translog files threshold + * + * @return {@code true} if the shard should be Refreshed + */ + @Override + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return readers.size() > maxUncommittedTranslogFilesThreshold; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 7c50ed6ecd58f..ced8ebc450cfa 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -2052,4 +2052,14 @@ public static String createEmptyTranslog( public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommit) { return minUnrefCheckpointInLastCommit; } + + /** + * Tests whether or not the shard should be Refreshed. + * This test is based on the number of Translog files compared to configured number of Translog files threshold + * + * @return {@code true} if the shard should be Refreshed + */ + public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) { + return false; + } } diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index e1a0b7d1c1293..25d9397287d91 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -142,4 +142,6 @@ public interface TranslogManager { Releasable drainSync(); Translog.TranslogGeneration getTranslogGeneration(); + + boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold); }