From 6f8d2319c181c0f6f5f4130cf184bffe2ad72b8e Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 21 Sep 2023 14:54:09 -0700 Subject: [PATCH] Refactor the merge policy extraction logic --- .../org/opensearch/index/IndexSettings.java | 182 +++++++++--------- .../index/LogByteSizeMergePolicyProvider.java | 14 +- .../index/MergePolicySettingsTests.java | 30 ++- 3 files changed, 121 insertions(+), 105 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 4938958154c68..b83673508e68b 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -83,12 +83,10 @@ */ @PublicApi(since = "1.0.0") public final class IndexSettings { - private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default"; + private static final String DEFAULT = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; public static final String TIERED_MERGE_POLICY = "tiered"; public static final String LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size"; - private static final String DEFAULT_MERGE_POLICY = "default_merge_policy"; - private static final String DEFAULT_TIME_INDEX_MERGE_POLICY = "default_time_index_merge_policy"; public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( "index.query.default_field", Collections.singletonList("*"), @@ -569,20 +567,53 @@ public final class IndexSettings { public static final Setting INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString( "index.merge_on_flush.policy", - MERGE_ON_FLUSH_DEFAULT_POLICY, + DEFAULT, Property.IndexScope, Property.Dynamic ); - public static final Setting INDEX_MERGE_POLICY = Setting.simpleString( - "index.merge.policy", - DEFAULT_MERGE_POLICY, - Property.IndexScope - ); + public static final Setting INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT, policy -> { + if (!(policy.isEmpty() + || policy.equals(DEFAULT) + || policy.equals(TIERED_MERGE_POLICY) + || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + policy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + }, Property.IndexScope); public static final Setting TIME_INDEX_MERGE_POLICY = Setting.simpleString( - "indices.time_index.merge.policy", - DEFAULT_TIME_INDEX_MERGE_POLICY, + "indices.time_index.default_index_merge_policy", + DEFAULT, + policy -> { + if (!(policy.isEmpty() + || policy.equals(DEFAULT) + || policy.equals(TIERED_MERGE_POLICY) + || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { + throw new IllegalArgumentException( + "The " + + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + policy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + }, Property.NodeScope ); @@ -668,8 +699,6 @@ public final class IndexSettings { private final MergeSchedulerConfig mergeSchedulerConfig; private final TieredMergePolicyProvider tieredMergePolicyProvider; private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider; - private final MergePolicyProvider defaultMergePolicyProvider; - private final MergePolicyProvider defaultTimeIndexMergePolicyProvider; private final IndexSortConfig indexSortConfig; private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); @@ -747,9 +776,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; - private volatile MergePolicyProvider mergePolicyProvider; - private volatile MergePolicyProvider timeIndexMergePolicyProvider; - /** * Returns the default search fields for this index. */ @@ -867,10 +893,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING); this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this); this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this); - this.defaultMergePolicyProvider = tieredMergePolicyProvider; - this.defaultTimeIndexMergePolicyProvider = tieredMergePolicyProvider; - setMergePolicyProvider(scopedSettings.get(INDEX_MERGE_POLICY)); - setTimeIndexMergePolicy(scopedSettings.get(INDEX_MERGE_POLICY), TIME_INDEX_MERGE_POLICY.get(nodeSettings)); this.indexSortConfig = new IndexSortConfig(this); searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); @@ -1494,11 +1516,56 @@ public long getGcDeletesInMillis() { * @param isTimeIndex true if index contains @timestamp field */ public MergePolicy getMergePolicy(boolean isTimeIndex) { - if (isTimeIndex) { - return timeIndexMergePolicyProvider.getMergePolicy(); + String indexScopedPolicy = scopedSettings.get(INDEX_MERGE_POLICY); + MergePolicyProvider mergePolicyProvider; + if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { + mergePolicyProvider = tieredMergePolicyProvider; + } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + mergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (indexScopedPolicy.equals(DEFAULT) || Strings.isEmpty(indexScopedPolicy)) { + if (!isTimeIndex) { + mergePolicyProvider = tieredMergePolicyProvider; + } else { + String nodeScopedTimeIndexPolicy = TIME_INDEX_MERGE_POLICY.get(nodeSettings); + if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) { + mergePolicyProvider = tieredMergePolicyProvider; + } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + mergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { + mergePolicyProvider = tieredMergePolicyProvider; + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + nodeScopedTimeIndexPolicy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + } } else { - return mergePolicyProvider.getMergePolicy(); + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + indexScopedPolicy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + if (logger.isTraceEnabled()) { + logger.trace("Index: " + this.index.getName() + ", Merge policy used: " + mergePolicyProvider.toString()); } + return mergePolicyProvider.getMergePolicy(); } public T getValue(Setting setting) { @@ -1689,7 +1756,7 @@ public boolean isMergeOnFlushEnabled() { } private void setMergeOnFlushPolicy(String policy) { - if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) { + if (Strings.isEmpty(policy) || DEFAULT.equalsIgnoreCase(policy)) { mergeOnFlushPolicy = null; } else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) { this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new; @@ -1700,78 +1767,13 @@ private void setMergeOnFlushPolicy(String policy) { + " has unsupported policy specified: " + policy + ". Please use one of: " - + MERGE_ON_FLUSH_DEFAULT_POLICY + + DEFAULT + ", " + MERGE_ON_FLUSH_MERGE_POLICY ); } } - private void setMergePolicyProvider(String indexScopedPolicy) { - if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { - this.mergePolicyProvider = tieredMergePolicyProvider; - } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - this.mergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) { - this.mergePolicyProvider = defaultMergePolicyProvider; - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + indexScopedPolicy - + ". Please use one of: " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); - } - if (logger.isTraceEnabled()) { - logger.trace("Merge policy used: " + mergePolicyProvider.toString()); - } - } - - private void setTimeIndexMergePolicy(String indexScopedPolicy, String nodeScopedTimeIndexPolicy) { - if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = tieredMergePolicyProvider; - } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) { - if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = tieredMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT_TIME_INDEX_MERGE_POLICY) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { - this.timeIndexMergePolicyProvider = defaultTimeIndexMergePolicyProvider; - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + nodeScopedTimeIndexPolicy - + ". Please use one of: " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); - } - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + indexScopedPolicy - + ". Please use one of: " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); - } - if (logger.isTraceEnabled()) { - logger.trace("Time index merge policy used: " + timeIndexMergePolicyProvider.toString()); - } - } - public Optional> getMergeOnFlushPolicy() { return Optional.ofNullable(mergeOnFlushPolicy); } diff --git a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java index cf0ee2fff08e0..0e66338148099 100644 --- a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java +++ b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java @@ -60,10 +60,8 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { ByteSizeUnit.GB ); - // settings for LogByteSizeMergePolicy - public static final Setting INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting( - "index.merge.policy.log_byte_size.merge_factor", + "index.merge.log_byte_size_policy.merge_factor", DEFAULT_MERGE_FACTOR, // keeping it same as default max merge at once for tiered merge policy 2, Setting.Property.Dynamic, @@ -71,35 +69,35 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { ); public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting( - "index.merge.policy.log_byte_size.min_merge_mb", + "index.merge.log_byte_size_policy.min_merge_mb", DEFAULT_MIN_MERGE_MB, // keeping it same as default floor segment for tiered merge policy Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting( - "index.merge.policy.log_byte_size.max_merge_segment_mb", + "index.merge.log_byte_size_policy.max_merge_segment_mb", DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( - "index.merge.policy.log_byte_size.max_merge_segment_mb_forced_merge", + "index.merge.log_byte_size_policy.max_merge_segment_mb_forced_merge", DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE, Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting( - "index.merge.policy.log_byte_size.max_merged_docs", + "index.merge.log_byte_size_policy.max_merged_docs", DEFAULT_MAX_MERGE_DOCS, Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>( - "index.merge.policy.log_byte_size.no_cfs_ratio", + "index.merge.log_byte_size_policy.no_cfs_ratio", Double.toString(DEFAULT_NO_CFS_RATIO), TieredMergePolicyProvider::parseNoCFSRatio, Setting.Property.Dynamic, diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 7b39191482813..0ceaef60b7863 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -172,20 +172,35 @@ public void testMergePolicyPrecedence() throws IOException { } public void testInvalidMergePolicy() throws IOException { + + final Settings invalidSettings = Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build(); IllegalArgumentException exc1 = expectThrows( IllegalArgumentException.class, - () -> indexSettings(Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build()) + () -> IndexSettings.INDEX_MERGE_POLICY.get(invalidSettings) ); - assertThat(exc1.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); - - Settings nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); IllegalArgumentException exc2 = expectThrows( IllegalArgumentException.class, - () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings) + () -> indexSettings(invalidSettings).getMergePolicy(false) + ); + assertThat(exc2.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); + + final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); + IllegalArgumentException exc3 = expectThrows( + IllegalArgumentException.class, + () -> IndexSettings.TIME_INDEX_MERGE_POLICY.get(invalidSettings2) + ); + assertThat( + exc3.getMessage(), + containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") + ); + + IllegalArgumentException exc4 = expectThrows( + IllegalArgumentException.class, + () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), invalidSettings2).getMergePolicy(true) ); assertThat( - exc2.getMessage(), + exc4.getMessage(), containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") ); } @@ -482,7 +497,8 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { ); assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMBForForcedMerge(), - new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB) + .getMbFrac(), 0.001 );