diff --git a/CHANGELOG.md b/CHANGELOG.md index d374a725c84ee..e3f1e3b751081 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) - Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107)) - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) +- Configurable merge policy for datastreams with option to choose between LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 5b2afc44600bd..9f7b175c7d2f1 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -202,6 +202,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED, IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, + IndexSettings.INDEX_DATASTREAM_MERGE_POLICY, + MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, + MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING, + MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING, + MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING, + MergePolicyConfig.INDEX_LBS_MAX_MERGED_DOCS_SETTING, + MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING, IndexSettings.DEFAULT_SEARCH_PIPELINE, // Settings for Searchable Snapshots diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 1e4224c314f05..e8b0cf2049b30 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -84,6 +84,12 @@ @PublicApi(since = "1.0.0") public final class IndexSettings { private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default"; + + private static final String DATASTREAM_TIERED_MERGE_POLICY = "tiered"; + private static final String DATASTREAM_DEFAULT_POLICY = DATASTREAM_TIERED_MERGE_POLICY; + + private static final String DATASTREAM_LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size"; + private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( @@ -571,6 +577,13 @@ public final class IndexSettings { Property.Dynamic ); + public static final Setting INDEX_DATASTREAM_MERGE_POLICY = Setting.simpleString( + "index.datastream_merge.policy", + DATASTREAM_DEFAULT_POLICY, + Property.IndexScope, + Property.Dynamic + ); + public static final Setting SEARCHABLE_SNAPSHOT_REPOSITORY = Setting.simpleString( "index.searchable_snapshot.repository", Property.IndexScope, @@ -729,6 +742,8 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; + private volatile MergePolicy dataStreamMergePolicy; + /** * Returns the default search fields for this index. */ @@ -858,6 +873,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME); mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED); setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); + setDataStreamDefaultPolicy(scopedSettings.get(INDEX_DATASTREAM_MERGE_POLICY)); + defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type @@ -872,6 +889,34 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, mergePolicyConfig::setDeletesPctAllowed ); + + scopedSettings.addSettingsUpdateConsumer( + MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, + mergePolicyConfig::setLBSMergeFactor + ); + + scopedSettings.addSettingsUpdateConsumer( + MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING, + mergePolicyConfig::setLBSMinMergedMB + ); + + scopedSettings.addSettingsUpdateConsumer( + MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING, + mergePolicyConfig::setLBSMaxMergeSegment + ); + + scopedSettings.addSettingsUpdateConsumer( + MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING, + mergePolicyConfig::setLBSMaxMergeMBForForcedMerge + ); + + scopedSettings.addSettingsUpdateConsumer( + MergePolicyConfig.INDEX_LBS_MAX_MERGED_DOCS_SETTING, + mergePolicyConfig::setLBSMaxMergeDocs + ); + + scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING, mergePolicyConfig::setLBSNoCFSRatio); + scopedSettings.addSettingsUpdateConsumer( MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed @@ -940,6 +985,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled); scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy); + scopedSettings.addSettingsUpdateConsumer(INDEX_DATASTREAM_MERGE_POLICY, this::setDataStreamDefaultPolicy); scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline); scopedSettings.addSettingsUpdateConsumer( INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, @@ -1444,6 +1490,10 @@ public MergePolicy getMergePolicy() { return mergePolicyConfig.getMergePolicy(); } + public MergePolicy getDataStreamMergePolicy() { + return dataStreamMergePolicy; + } + public T getValue(Setting setting) { return scopedSettings.get(setting); } @@ -1650,6 +1700,25 @@ private void setMergeOnFlushPolicy(String policy) { } } + private void setDataStreamDefaultPolicy(String policy) { + if (policy.equals(DATASTREAM_TIERED_MERGE_POLICY)) { + this.dataStreamMergePolicy = mergePolicyConfig.getMergePolicy(); + } else if (policy.equals(DATASTREAM_LOG_BYTE_SIZE_MERGE_POLICY)) { + this.dataStreamMergePolicy = mergePolicyConfig.getLogByteSizeMergePolicy(); + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + policy + + ". Please use one of: " + + DATASTREAM_TIERED_MERGE_POLICY + + ", " + + DATASTREAM_LOG_BYTE_SIZE_MERGE_POLICY + ); + } + } + public Optional> getMergeOnFlushPolicy() { return Optional.ofNullable(mergeOnFlushPolicy); } diff --git a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java b/server/src/main/java/org/opensearch/index/MergePolicyConfig.java index fe2af21dfe039..c8f5be022f794 100644 --- a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java +++ b/server/src/main/java/org/opensearch/index/MergePolicyConfig.java @@ -33,6 +33,7 @@ package org.opensearch.index; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.TieredMergePolicy; @@ -41,15 +42,21 @@ import org.opensearch.core.common.unit.ByteSizeUnit; import org.opensearch.core.common.unit.ByteSizeValue; +import static org.apache.lucene.index.LogMergePolicy.DEFAULT_MAX_MERGE_DOCS; +import static org.apache.lucene.index.LogMergePolicy.DEFAULT_NO_CFS_RATIO; + /** * A shard in opensearch is a Lucene index, and a Lucene index is broken * down into segments. Segments are internal storage elements in the index * where the index data is stored, and are immutable up to delete markers. * Segments are, periodically, merged into larger segments to keep the * index size at bay and expunge deletes. + * This class customizes and exposes 2 merge policies from lucene - + * {@link LogByteSizeMergePolicy} and {@link TieredMergePolicy}. + * * *

- * Merges select segments of approximately equal size, subject to an allowed + * Tiered merge policy select segments of approximately equal size, subject to an allowed * number of segments per tier. The merge policy is able to merge * non-adjacent segments, and separates how many segments are merged at once from how many * segments are allowed per tier. It also does not over-merge (i.e., cascade merges). @@ -122,11 +129,37 @@ * possibly either increase the max_merged_segment or issue an optimize * call for the index (try and aim to issue it on a low traffic time). * + /** + *

+ * The LogByteSizeMergePolicy is an alternative merge policy designed to optimize the merging of segments in scenarios + * with time-based data. + * While the TieredMergePolicy is the default choice, the LogByteSizeMergePolicy can be configured + * as the default merge policy for time-based data using the index.datastream_merge.policy setting. + * + *

+ * Unlike the TieredMergePolicy, which prioritizes merging segments of equal sizes, the LogByteSizeMergePolicy + * specializes in merging adjacent segments efficiently. + * This characteristic makes it particularly well-suited for range queries on time-based data. + * Typically, adjacent segments in time-based data often contain documents with similar timestamps. + * When these segments are merged, the resulting segment covers a range of timestamps with reduced overlap compared + * to the adjacent segments. This reduced overlap remains even as segments grow older and larger, + * which can significantly benefit range queries on timestamps. + * + *

+ * In contrast, the TieredMergePolicy does not honor this timestamp range optimization. It focuses on merging segments + * of equal sizes and does not consider adjacency. Consequently, as segments grow older and larger, + * the overlap of timestamp ranges among adjacent segments managed by TieredMergePolicy can increase. + * This can lead to inefficiencies in range queries on timestamps, as the number of segments to be scanned + * within a given timestamp range could become high. + * * @opensearch.internal */ public final class MergePolicyConfig { - private final OpenSearchTieredMergePolicy mergePolicy = new OpenSearchTieredMergePolicy(); + private final OpenSearchTieredMergePolicy openSearchTieredMergePolicy = new OpenSearchTieredMergePolicy(); + + private final LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy(); + private final Logger logger; private final boolean mergesEnabled; @@ -137,6 +170,12 @@ public final class MergePolicyConfig { public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d; public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d; public static final double DEFAULT_DELETES_PCT_ALLOWED = 20.0d; + + public static final ByteSizeValue DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE = new ByteSizeValue( + Long.MAX_VALUE / ByteSizeUnit.GB.toBytes(1), + ByteSizeUnit.GB + ); + public static final Setting INDEX_COMPOUND_FORMAT_SETTING = new Setting<>( "index.compound_format", Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO), @@ -194,6 +233,53 @@ public final class MergePolicyConfig { Property.Dynamic, Property.IndexScope ); + + // settings for LogByteSizeMergePolicy + + public static final Setting INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting( + "index.merge.policy.log_byte_size.merge_factor", + DEFAULT_MAX_MERGE_AT_ONCE, // keeping it same as default max merge at once for tiered merge policy + 2, + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting( + "index.merge.policy.log_byte_size.min_merge_mb", + DEFAULT_FLOOR_SEGMENT, // keeping it same as default floor segment for tiered merge policy + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting( + "index.merge.policy.log_byte_size.max_merge_segment_mb", + DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( + "index.merge.policy.log_byte_size.max_merge_segment_mb_forced_merge", + DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE, + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting( + "index.merge.policy.log_byte_size.max_merged_docs", + DEFAULT_MAX_MERGE_DOCS, + Property.Dynamic, + Property.IndexScope + ); + + public static final Setting INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>( + "index.merge.policy.log_byte_size.no_cfs_ratio", + Double.toString(DEFAULT_NO_CFS_RATIO), + MergePolicyConfig::parseNoCFSRatio, + Property.Dynamic, + Property.IndexScope + ); + // don't convert to Setting<> and register... we only set this in tests and register via a plugin public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; @@ -216,13 +302,25 @@ public final class MergePolicyConfig { ); } maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier); - mergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); - mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); - mergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); - mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); - mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); - mergePolicy.setSegmentsPerTier(segmentsPerTier); - mergePolicy.setDeletesPctAllowed(deletesPctAllowed); + openSearchTieredMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); + openSearchTieredMergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); + openSearchTieredMergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); + openSearchTieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); + openSearchTieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); + openSearchTieredMergePolicy.setSegmentsPerTier(segmentsPerTier); + openSearchTieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); + + // Undocumented settings, works great with defaults + + logByteSizeMergePolicy.setMergeFactor(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING)); + logByteSizeMergePolicy.setMinMergeMB(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMB(indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge( + indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING).getMbFrac() + ); + logByteSizeMergePolicy.setMaxMergeDocs(indexSettings.getValue(INDEX_LBS_MAX_MERGED_DOCS_SETTING)); + logByteSizeMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_LBS_NO_CFS_RATIO_SETTING)); + if (logger.isTraceEnabled()) { logger.trace( "using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}]," @@ -239,31 +337,55 @@ public final class MergePolicyConfig { } void setSegmentsPerTier(Double segmentsPerTier) { - mergePolicy.setSegmentsPerTier(segmentsPerTier); + openSearchTieredMergePolicy.setSegmentsPerTier(segmentsPerTier); } void setMaxMergedSegment(ByteSizeValue maxMergedSegment) { - mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); + openSearchTieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); } void setMaxMergesAtOnce(Integer maxMergeAtOnce) { - mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); + openSearchTieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); } void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) { - mergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); + openSearchTieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); } void setExpungeDeletesAllowed(Double value) { - mergePolicy.setForceMergeDeletesPctAllowed(value); + openSearchTieredMergePolicy.setForceMergeDeletesPctAllowed(value); } void setNoCFSRatio(Double noCFSRatio) { - mergePolicy.setNoCFSRatio(noCFSRatio); + openSearchTieredMergePolicy.setNoCFSRatio(noCFSRatio); } void setDeletesPctAllowed(Double deletesPctAllowed) { - mergePolicy.setDeletesPctAllowed(deletesPctAllowed); + openSearchTieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); + } + + void setLBSMergeFactor(int mergeFactor) { + logByteSizeMergePolicy.setMergeFactor(mergeFactor); + } + + void setLBSMaxMergeSegment(ByteSizeValue maxMergeSegment) { + logByteSizeMergePolicy.setMaxMergeMB(maxMergeSegment.getMbFrac()); + } + + void setLBSMinMergedMB(ByteSizeValue minMergedMB) { + logByteSizeMergePolicy.setMinMergeMB(minMergedMB.getMbFrac()); + } + + void setLBSMaxMergeMBForForcedMerge(ByteSizeValue maxMergeMBForcedMerge) { + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(maxMergeMBForcedMerge.getMbFrac()); + } + + void setLBSMaxMergeDocs(int maxMergeDocs) { + logByteSizeMergePolicy.setMaxMergeDocs(maxMergeDocs); + } + + void setLBSNoCFSRatio(Double noCFSRatio) { + logByteSizeMergePolicy.setNoCFSRatio(noCFSRatio); } private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) { @@ -286,7 +408,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT } MergePolicy getMergePolicy() { - return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE; + return mergesEnabled ? openSearchTieredMergePolicy : NoMergePolicy.INSTANCE; + } + + MergePolicy getLogByteSizeMergePolicy() { + return mergesEnabled ? logByteSizeMergePolicy : NoMergePolicy.INSTANCE; } private static double parseNoCFSRatio(String noCFSRatio) { diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 34c5ed2112482..3f8aa5305463e 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3769,7 +3769,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro indexSettings, warmer, store, - indexSettings.getMergePolicy(), + isTimeSeriesIndex ? indexSettings.getDataStreamMergePolicy() : indexSettings.getMergePolicy(), mapperService != null ? mapperService.indexAnalyzer() : null, similarityService.similarity(mapperService), engineConfigFactory.newCodecServiceOrDefault(indexSettings, mapperService, logger, codecService), diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 387997892ee30..7a13463c4073b 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -31,6 +31,7 @@ package org.opensearch.index; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -72,6 +73,7 @@ public void testNoMerges() { indexSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build()) ); assertTrue(mp.getMergePolicy() instanceof NoMergePolicy); + assertTrue(mp.getLogByteSizeMergePolicy() instanceof NoMergePolicy); } public void testUpdateSettings() throws IOException { @@ -89,6 +91,55 @@ public void testUpdateSettings() throws IOException { assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.0)); } + public void testDefaultMergePolicy() throws IOException { + IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); + assertTrue(indexSettings.getMergePolicy() instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getDataStreamMergePolicy() instanceof OpenSearchTieredMergePolicy); + } + + public void testUpdateSettingsForLogByteSizeMergePolicy() throws IOException { + IndexSettings indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size").build() + ); + + assertThat(indexSettings.getDataStreamMergePolicy().getNoCFSRatio(), equalTo(0.1)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.9) + .build() + ); + assertThat((indexSettings.getDataStreamMergePolicy()).getNoCFSRatio(), equalTo(0.9)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.1) + .build() + ); + assertThat((indexSettings.getDataStreamMergePolicy()).getNoCFSRatio(), equalTo(0.1)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.0) + .build() + ); + assertThat((indexSettings.getDataStreamMergePolicy()).getNoCFSRatio(), equalTo(0.0)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), "true") + .build() + ); + assertThat((indexSettings.getDataStreamMergePolicy()).getNoCFSRatio(), equalTo(1.0)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), "false") + .build() + ); + assertThat((indexSettings.getDataStreamMergePolicy()).getNoCFSRatio(), equalTo(0.0)); + } + public void testTieredMergePolicySettingsUpdate() throws IOException { IndexSettings indexSettings = indexSettings(Settings.EMPTY); assertEquals( @@ -256,6 +307,112 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { ); } + public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { + + IndexSettings indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size").build() + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getDataStreamMergePolicy()).getMergeFactor(), + MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put( + MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), + MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + 1 + ) + .build() + ) + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getDataStreamMergePolicy()).getMergeFactor(), + MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + 1 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put( + MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING.getKey(), + new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB) + ) + .build() + ) + ); + + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getDataStreamMergePolicy()).getMinMergeMB(), + new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put( + MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING.getKey(), + new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB) + ) + .build() + ) + ); + + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getDataStreamMergePolicy()).getMaxMergeMB(), + new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put( + MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING.getKey(), + new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB) + ) + .build() + ) + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getDataStreamMergePolicy()).getMaxMergeMBForForcedMerge(), + new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_MAX_MERGED_DOCS_SETTING.getKey(), 10000000) + .build() + ) + ); + assertEquals(((LogByteSizeMergePolicy) indexSettings.getDataStreamMergePolicy()).getMaxMergeDocs(), 10000000); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey(), "log_byte_size") + .put(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.1) + .build() + ) + ); + assertEquals(indexSettings.getDataStreamMergePolicy().getNoCFSRatio(), 0.1, 0.0); + } + public Settings build(String value) { return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); }