Skip to content

Commit

Permalink
Refactor the merge policy extraction logic
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Sep 21, 2023
1 parent dda9c5b commit 6f8d231
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 105 deletions.
182 changes: 92 additions & 90 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,10 @@
*/
@PublicApi(since = "1.0.0")
public final class IndexSettings {
private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default";
private static final String DEFAULT = "default";
private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";
public static final String TIERED_MERGE_POLICY = "tiered";
public static final String LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size";
private static final String DEFAULT_MERGE_POLICY = "default_merge_policy";
private static final String DEFAULT_TIME_INDEX_MERGE_POLICY = "default_time_index_merge_policy";
public static final Setting<List<String>> DEFAULT_FIELD_SETTING = Setting.listSetting(
"index.query.default_field",
Collections.singletonList("*"),
Expand Down Expand Up @@ -569,20 +567,53 @@ public final class IndexSettings {

public static final Setting<String> INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString(
"index.merge_on_flush.policy",
MERGE_ON_FLUSH_DEFAULT_POLICY,
DEFAULT,
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> INDEX_MERGE_POLICY = Setting.simpleString(
"index.merge.policy",
DEFAULT_MERGE_POLICY,
Property.IndexScope
);
public static final Setting<String> INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT, policy -> {
if (!(policy.isEmpty()
|| policy.equals(DEFAULT)
|| policy.equals(TIERED_MERGE_POLICY)
|| policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ DEFAULT
+ ", "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
}, Property.IndexScope);

public static final Setting<String> TIME_INDEX_MERGE_POLICY = Setting.simpleString(
"indices.time_index.merge.policy",
DEFAULT_TIME_INDEX_MERGE_POLICY,
"indices.time_index.default_index_merge_policy",
DEFAULT,
policy -> {
if (!(policy.isEmpty()
|| policy.equals(DEFAULT)
|| policy.equals(TIERED_MERGE_POLICY)
|| policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) {
throw new IllegalArgumentException(
"The "
+ IndexSettings.TIME_INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ DEFAULT
+ ", "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
},
Property.NodeScope
);

Expand Down Expand Up @@ -668,8 +699,6 @@ public final class IndexSettings {
private final MergeSchedulerConfig mergeSchedulerConfig;
private final TieredMergePolicyProvider tieredMergePolicyProvider;
private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider;
private final MergePolicyProvider defaultMergePolicyProvider;
private final MergePolicyProvider defaultTimeIndexMergePolicyProvider;
private final IndexSortConfig indexSortConfig;
private final IndexScopedSettings scopedSettings;
private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis();
Expand Down Expand Up @@ -747,9 +776,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
*/
private volatile UnaryOperator<MergePolicy> mergeOnFlushPolicy;

private volatile MergePolicyProvider mergePolicyProvider;
private volatile MergePolicyProvider timeIndexMergePolicyProvider;

/**
* Returns the default search fields for this index.
*/
Expand Down Expand Up @@ -867,10 +893,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING);
this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this);
this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this);
this.defaultMergePolicyProvider = tieredMergePolicyProvider;
this.defaultTimeIndexMergePolicyProvider = tieredMergePolicyProvider;
setMergePolicyProvider(scopedSettings.get(INDEX_MERGE_POLICY));
setTimeIndexMergePolicy(scopedSettings.get(INDEX_MERGE_POLICY), TIME_INDEX_MERGE_POLICY.get(nodeSettings));
this.indexSortConfig = new IndexSortConfig(this);
searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER);
defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE);
Expand Down Expand Up @@ -1494,11 +1516,56 @@ public long getGcDeletesInMillis() {
* @param isTimeIndex true if index contains @timestamp field
*/
public MergePolicy getMergePolicy(boolean isTimeIndex) {
if (isTimeIndex) {
return timeIndexMergePolicyProvider.getMergePolicy();
String indexScopedPolicy = scopedSettings.get(INDEX_MERGE_POLICY);
MergePolicyProvider mergePolicyProvider;
if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) {
mergePolicyProvider = tieredMergePolicyProvider;
} else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) {
mergePolicyProvider = logByteSizeMergePolicyProvider;
} else if (indexScopedPolicy.equals(DEFAULT) || Strings.isEmpty(indexScopedPolicy)) {
if (!isTimeIndex) {
mergePolicyProvider = tieredMergePolicyProvider;
} else {
String nodeScopedTimeIndexPolicy = TIME_INDEX_MERGE_POLICY.get(nodeSettings);
if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) {
mergePolicyProvider = tieredMergePolicyProvider;
} else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) {
mergePolicyProvider = logByteSizeMergePolicyProvider;
} else if (nodeScopedTimeIndexPolicy.equals(DEFAULT) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) {
mergePolicyProvider = tieredMergePolicyProvider;
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.TIME_INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ nodeScopedTimeIndexPolicy
+ ". Please use one of: "
+ DEFAULT
+ ", "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
}
} else {
return mergePolicyProvider.getMergePolicy();
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ indexScopedPolicy
+ ". Please use one of: "
+ DEFAULT
+ ", "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
if (logger.isTraceEnabled()) {
logger.trace("Index: " + this.index.getName() + ", Merge policy used: " + mergePolicyProvider.toString());
}
return mergePolicyProvider.getMergePolicy();
}

public <T> T getValue(Setting<T> setting) {
Expand Down Expand Up @@ -1689,7 +1756,7 @@ public boolean isMergeOnFlushEnabled() {
}

private void setMergeOnFlushPolicy(String policy) {
if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) {
if (Strings.isEmpty(policy) || DEFAULT.equalsIgnoreCase(policy)) {
mergeOnFlushPolicy = null;
} else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) {
this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new;
Expand All @@ -1700,78 +1767,13 @@ private void setMergeOnFlushPolicy(String policy) {
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ MERGE_ON_FLUSH_DEFAULT_POLICY
+ DEFAULT
+ ", "
+ MERGE_ON_FLUSH_MERGE_POLICY
);
}
}

private void setMergePolicyProvider(String indexScopedPolicy) {
if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) {
this.mergePolicyProvider = tieredMergePolicyProvider;
} else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) {
this.mergePolicyProvider = logByteSizeMergePolicyProvider;
} else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) {
this.mergePolicyProvider = defaultMergePolicyProvider;
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ indexScopedPolicy
+ ". Please use one of: "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
if (logger.isTraceEnabled()) {
logger.trace("Merge policy used: " + mergePolicyProvider.toString());
}
}

private void setTimeIndexMergePolicy(String indexScopedPolicy, String nodeScopedTimeIndexPolicy) {
if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) {
this.timeIndexMergePolicyProvider = tieredMergePolicyProvider;
} else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) {
this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider;
} else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) {
if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) {
this.timeIndexMergePolicyProvider = tieredMergePolicyProvider;
} else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) {
this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider;
} else if (nodeScopedTimeIndexPolicy.equals(DEFAULT_TIME_INDEX_MERGE_POLICY) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) {
this.timeIndexMergePolicyProvider = defaultTimeIndexMergePolicyProvider;
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.TIME_INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ nodeScopedTimeIndexPolicy
+ ". Please use one of: "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ indexScopedPolicy
+ ". Please use one of: "
+ TIERED_MERGE_POLICY
+ ", "
+ LOG_BYTE_SIZE_MERGE_POLICY
);
}
if (logger.isTraceEnabled()) {
logger.trace("Time index merge policy used: " + timeIndexMergePolicyProvider.toString());
}
}

public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,46 +60,44 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider {
ByteSizeUnit.GB
);

// settings for LogByteSizeMergePolicy

public static final Setting<Integer> INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting(
"index.merge.policy.log_byte_size.merge_factor",
"index.merge.log_byte_size_policy.merge_factor",
DEFAULT_MERGE_FACTOR, // keeping it same as default max merge at once for tiered merge policy
2,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting(
"index.merge.policy.log_byte_size.min_merge_mb",
"index.merge.log_byte_size_policy.min_merge_mb",
DEFAULT_MIN_MERGE_MB, // keeping it same as default floor segment for tiered merge policy
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting(
"index.merge.policy.log_byte_size.max_merge_segment_mb",
"index.merge.log_byte_size_policy.max_merge_segment_mb",
DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting(
"index.merge.policy.log_byte_size.max_merge_segment_mb_forced_merge",
"index.merge.log_byte_size_policy.max_merge_segment_mb_forced_merge",
DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<Integer> INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting(
"index.merge.policy.log_byte_size.max_merged_docs",
"index.merge.log_byte_size_policy.max_merged_docs",
DEFAULT_MAX_MERGE_DOCS,
Setting.Property.Dynamic,
Setting.Property.IndexScope
);

public static final Setting<Double> INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>(
"index.merge.policy.log_byte_size.no_cfs_ratio",
"index.merge.log_byte_size_policy.no_cfs_ratio",
Double.toString(DEFAULT_NO_CFS_RATIO),
TieredMergePolicyProvider::parseNoCFSRatio,
Setting.Property.Dynamic,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,20 +172,35 @@ public void testMergePolicyPrecedence() throws IOException {
}

public void testInvalidMergePolicy() throws IOException {

final Settings invalidSettings = Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build();
IllegalArgumentException exc1 = expectThrows(
IllegalArgumentException.class,
() -> indexSettings(Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build())
() -> IndexSettings.INDEX_MERGE_POLICY.get(invalidSettings)
);

assertThat(exc1.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: "));

Settings nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build();
IllegalArgumentException exc2 = expectThrows(
IllegalArgumentException.class,
() -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings)
() -> indexSettings(invalidSettings).getMergePolicy(false)
);
assertThat(exc2.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: "));

final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build();
IllegalArgumentException exc3 = expectThrows(
IllegalArgumentException.class,
() -> IndexSettings.TIME_INDEX_MERGE_POLICY.get(invalidSettings2)
);
assertThat(
exc3.getMessage(),
containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")
);

IllegalArgumentException exc4 = expectThrows(
IllegalArgumentException.class,
() -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), invalidSettings2).getMergePolicy(true)
);
assertThat(
exc2.getMessage(),
exc4.getMessage(),
containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")
);
}
Expand Down Expand Up @@ -482,7 +497,8 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException {
);
assertEquals(
((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMBForForcedMerge(),
new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(),
new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB)
.getMbFrac(),
0.001
);

Expand Down

0 comments on commit 6f8d231

Please sign in to comment.