Skip to content

Commit

Permalink
Added IT/UT and changed setting scope
Browse files Browse the repository at this point in the history
Signed-off-by: Shubh Sahu <[email protected]>
  • Loading branch information
Shubh Sahu committed Apr 5, 2024
1 parent 5f4ca3a commit 0e1a2d3
Show file tree
Hide file tree
Showing 14 changed files with 110 additions and 39 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.indices.refresh;

import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.remotestore.RemoteStoreBaseIntegTestCase;
import org.opensearch.test.OpenSearchIntegTestCase;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RefreshRemoteTranslogFilesIT extends RemoteStoreBaseIntegTestCase {

protected final String INDEX_NAME = "remote-store-test-idx-1";

public void testRefreshOnTooManyRemoteTranslogFiles() throws Exception {

internalCluster().startClusterManagerOnlyNode();
internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
ensureGreen(INDEX_NAME);

ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest();
updateSettingsRequest.persistentSettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "5")
);
assertAcked(client().admin().cluster().updateSettings(updateSettingsRequest).actionGet());

// indexing 35 documents (7 bulk requests), which should trigger refresh, and hence number of documents(searchable) should be 35.
// Here refresh will be triggered on 6th and 7th bulk request. One extra since translogs will be marked
// unreferenced after 6th refresh completes and will be trimmed on 7th bulk request call.
for (int i = 0; i < 7; i++) {
indexBulk(INDEX_NAME, 5);
}

// refresh will not trigger here, hence total searchable documents will be 35 (not 40)
indexBulk(INDEX_NAME, 5);

long currentDocCount = client().prepareSearch(INDEX_NAME).setSize(0).get().getHits().getTotalHits().value;
assertEquals(35, currentDocCount);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class MigrationBaseTestCase extends OpenSearchIntegTestCase {

protected Path segmentRepoPath;
protected Path translogRepoPath;
boolean addRemote = false;
protected boolean addRemote = false;
Settings extraSettings = Settings.EMPTY;

private final List<String> documentKeys = List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,8 @@ public void apply(Settings value, Settings current, Settings previous) {

RemoteStoreSettings.CLUSTER_REMOTE_INDEX_SEGMENT_METADATA_RETENTION_MAX_COUNT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING
RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,6 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
// Settings for concurrent segment search
IndexSettings.INDEX_CONCURRENT_SEGMENT_SEARCH_SETTING,

IndexSettings.INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES,

// validate that built-in similarities don't get redefined
Setting.groupSetting("index.similarity.", (s) -> {
Map<String, Settings> groups = s.getAsGroups();
Expand Down
24 changes: 0 additions & 24 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -719,19 +719,6 @@ public static IndexMergePolicy fromString(String text) {
Property.IndexScope
);

/**
* Index setting describing the maximum number of uncommitted translog files at a time.
* If breached the shard will be Refreshed, this is to control large number of tranlog files
* downloads during recovery.
*/
public static final Setting<Integer> INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES = Setting.intSetting(
"index.max_uncommitted_translog_files",
300,
1,
Property.Dynamic,
Property.IndexScope
);

private final Index index;
private final Version version;
private final Logger logger;
Expand Down Expand Up @@ -815,7 +802,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
private volatile long mappingTotalFieldsLimit;
private volatile long mappingDepthLimit;
private volatile long mappingFieldNameLengthLimit;
private volatile int maxUncommittedTranslogFiles;

/**
* The maximum number of refresh listeners allows on this shard.
Expand Down Expand Up @@ -994,7 +980,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED);
defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);
maxUncommittedTranslogFiles = scopedSettings.get(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES);
/* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7).
* For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type
* to higher bytes size like integer to long. This behavior was changed from OpenSearch 2.7 version not to
Expand Down Expand Up @@ -1119,15 +1104,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING,
this::setDocIdFuzzySetFalsePositiveProbability
);
scopedSettings.addSettingsUpdateConsumer(INDEX_MAX_UNCOMMITTED_TRANSLOG_FILES, this::setMaxUncommittedTranslogFiles);
}

private void setMaxUncommittedTranslogFiles(int maxUncommittedTranslogFiles) {
this.maxUncommittedTranslogFiles = maxUncommittedTranslogFiles;
}

public int getMaxUncommittedTranslogFiles() {
return maxUncommittedTranslogFiles;
}

private void setSearchIdleAfter(TimeValue searchIdleAfter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4489,11 +4489,11 @@ public Durability getTranslogDurability() {
* threshold count determined by {@code index.translog.max_uncommitted_files_threshold}
* @return {@code true} if the shard should be Refreshed
*/
boolean shouldRefreshShard() {
public boolean shouldRefreshShard() {
final Engine engine = getEngineOrNull();
if (engine != null) {
try {
return engine.translogManager().shouldRefreshShard(indexSettings.getMaxUncommittedTranslogFiles());
return engine.translogManager().shouldRefreshShard();
} catch (final AlreadyClosedException e) {
// we are already closed, no need to Refresh
}
Expand Down Expand Up @@ -4570,9 +4570,8 @@ public void onAfter() {
}
}
} else if (shouldRefreshShard() && isRefreshRunning.compareAndSet(false, true)) {

if (shouldRefreshShard()) {
logger.info("submitting async Refresh request");
logger.debug("submitting async Refresh request");
final AbstractRunnable _refresh = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public void close() throws IOException {
}

@Override
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
return getTranslog(true).shouldRefreshShard(maxUncommittedTranslogFilesThreshold);
public boolean shouldRefreshShard() {
return getTranslog(true).shouldRefreshShard();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public Translog.TranslogGeneration getTranslogGeneration() {
}

@Override
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
public boolean shouldRefreshShard() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ int availablePermits() {
* @return {@code true} if the shard should be Refreshed
*/
@Override
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
return readers.size() > maxUncommittedTranslogFilesThreshold;
public boolean shouldRefreshShard() {
return readers.size() >= translogTransferManager.getMaxRemoteReferencedTranslogFilesSettings();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2059,7 +2059,7 @@ public long getMinUnreferencedSeqNoInSegments(long minUnrefCheckpointInLastCommi
*
* @return {@code true} if the shard should be Refreshed
*/
public boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold) {
public boolean shouldRefreshShard() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,5 +143,5 @@ public interface TranslogManager {

Translog.TranslogGeneration getTranslogGeneration();

boolean shouldRefreshShard(int maxUncommittedTranslogFilesThreshold);
boolean shouldRefreshShard();
}
Original file line number Diff line number Diff line change
Expand Up @@ -585,4 +585,8 @@ public void onFailure(Exception e) {
throw e;
}
}

public int getMaxRemoteReferencedTranslogFilesSettings() {
return this.remoteStoreSettings.getMaxRemoteReferencedTranslogFiles();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,21 @@ public class RemoteStoreSettings {
Property.Dynamic
);

/**
* Controls the maximum referenced remote translog files. If breached the shard will be Refreshed.
*/
public static final Setting<Integer> CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES = Setting.intSetting(
"cluster.remote_store.max_referenced_translog_files",
300,
1,
Property.Dynamic,
Property.NodeScope
);

private volatile TimeValue clusterRemoteTranslogBufferInterval;
private volatile int minRemoteSegmentMetadataFiles;
private volatile TimeValue clusterRemoteTranslogTransferTimeout;
private volatile int maxRemoteReferencedTranslogFiles;

public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(settings);
Expand All @@ -87,6 +99,9 @@ public RemoteStoreSettings(Settings settings, ClusterSettings clusterSettings) {
CLUSTER_REMOTE_TRANSLOG_TRANSFER_TIMEOUT_SETTING,
this::setClusterRemoteTranslogTransferTimeout
);

maxRemoteReferencedTranslogFiles = CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.get(settings);
clusterSettings.addSettingsUpdateConsumer(CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES, this::setMaxRemoteReferencedTranslogFiles);
}

public TimeValue getClusterRemoteTranslogBufferInterval() {
Expand All @@ -112,4 +127,12 @@ public TimeValue getClusterRemoteTranslogTransferTimeout() {
private void setClusterRemoteTranslogTransferTimeout(TimeValue clusterRemoteTranslogTransferTimeout) {
this.clusterRemoteTranslogTransferTimeout = clusterRemoteTranslogTransferTimeout;
}

public int getMaxRemoteReferencedTranslogFiles() {
return maxRemoteReferencedTranslogFiles;
}

private void setMaxRemoteReferencedTranslogFiles(int maxRemoteReferencedTranslogFiles) {
this.maxRemoteReferencedTranslogFiles = maxRemoteReferencedTranslogFiles;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,24 @@ public void testClusterRemoteTranslogTransferTimeout() {
);
assertEquals(TimeValue.timeValueSeconds(40), remoteStoreSettings.getClusterRemoteTranslogTransferTimeout());
}

public void testMaxRemoteReferencedTranslogFiles() {
// Test default value
assertEquals(300, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles());

// Test override with valid value
clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "100").build()
);
assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles());

// Test override with value less than minimum
assertThrows(
IllegalArgumentException.class,
() -> clusterSettings.applySettings(
Settings.builder().put(RemoteStoreSettings.CLUSTER_REMOTE_MAX_REFERENCED_TRANSLOG_FILES.getKey(), "0").build()
)
);
assertEquals(100, remoteStoreSettings.getMaxRemoteReferencedTranslogFiles());
}
}

0 comments on commit 0e1a2d3

Please sign in to comment.