Skip to content

Commit

Permalink
Configurable merge policy for time-based indices
Browse files Browse the repository at this point in the history
  • Loading branch information
rishabhmaurya committed Sep 18, 2023
1 parent 5b864c0 commit 38fb06a
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
- Configurable merge policy for datastreams with option to choose between LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
IndexSettings.INDEX_DATASTREAM_MERGE_POLICY,
MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING,
MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING,
MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING,
MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING,
MergePolicyConfig.INDEX_LBS_MAX_MERGED_DOCS_SETTING,
MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING,
IndexSettings.DEFAULT_SEARCH_PIPELINE,

// Settings for Searchable Snapshots
Expand Down
69 changes: 69 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@
@PublicApi(since = "1.0.0")
public final class IndexSettings {
private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default";

private static final String DATASTREAM_TIERED_MERGE_POLICY = "tiered";
private static final String DATASTREAM_DEFAULT_POLICY = DATASTREAM_TIERED_MERGE_POLICY;

private static final String DATASTREAM_LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size";

private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush";

public static final Setting<List<String>> DEFAULT_FIELD_SETTING = Setting.listSetting(
Expand Down Expand Up @@ -571,6 +577,13 @@ public final class IndexSettings {
Property.Dynamic
);

public static final Setting<String> INDEX_DATASTREAM_MERGE_POLICY = Setting.simpleString(
"index.datastream_merge.policy",
DATASTREAM_DEFAULT_POLICY,
Property.IndexScope,
Property.Dynamic
);

public static final Setting<String> SEARCHABLE_SNAPSHOT_REPOSITORY = Setting.simpleString(
"index.searchable_snapshot.repository",
Property.IndexScope,
Expand Down Expand Up @@ -729,6 +742,8 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) {
*/
private volatile UnaryOperator<MergePolicy> mergeOnFlushPolicy;

private volatile MergePolicy dataStreamMergePolicy;

/**
* Returns the default search fields for this index.
*/
Expand Down Expand Up @@ -858,6 +873,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME);
mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED);
setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY));
setDataStreamDefaultPolicy(scopedSettings.get(INDEX_DATASTREAM_MERGE_POLICY));

defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE);
/* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7).
* For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type
Expand All @@ -872,6 +889,34 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
mergePolicyConfig::setDeletesPctAllowed
);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING,
mergePolicyConfig::setLBSMergeFactor
);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING,
mergePolicyConfig::setLBSMinMergedMB
);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING,
mergePolicyConfig::setLBSMaxMergeSegment
);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING,
mergePolicyConfig::setLBSMaxMergeMBForForcedMerge
);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_LBS_MAX_MERGED_DOCS_SETTING,
mergePolicyConfig::setLBSMaxMergeDocs
);

scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_LBS_NO_CFS_RATIO_SETTING, mergePolicyConfig::setLBSNoCFSRatio);

scopedSettings.addSettingsUpdateConsumer(
MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
mergePolicyConfig::setExpungeDeletesAllowed
Expand Down Expand Up @@ -940,6 +985,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, this::setMaxFullFlushMergeWaitTime);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_ENABLED, this::setMergeOnFlushEnabled);
scopedSettings.addSettingsUpdateConsumer(INDEX_MERGE_ON_FLUSH_POLICY, this::setMergeOnFlushPolicy);
scopedSettings.addSettingsUpdateConsumer(INDEX_DATASTREAM_MERGE_POLICY, this::setDataStreamDefaultPolicy);
scopedSettings.addSettingsUpdateConsumer(DEFAULT_SEARCH_PIPELINE, this::setDefaultSearchPipeline);
scopedSettings.addSettingsUpdateConsumer(
INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING,
Expand Down Expand Up @@ -1444,6 +1490,10 @@ public MergePolicy getMergePolicy() {
return mergePolicyConfig.getMergePolicy();
}

public MergePolicy getDataStreamMergePolicy() {
return dataStreamMergePolicy;
}

public <T> T getValue(Setting<T> setting) {
return scopedSettings.get(setting);
}
Expand Down Expand Up @@ -1650,6 +1700,25 @@ private void setMergeOnFlushPolicy(String policy) {
}
}

private void setDataStreamDefaultPolicy(String policy) {
if (policy.equals(DATASTREAM_TIERED_MERGE_POLICY)) {
this.dataStreamMergePolicy = mergePolicyConfig.getMergePolicy();
} else if (policy.equals(DATASTREAM_LOG_BYTE_SIZE_MERGE_POLICY)) {
this.dataStreamMergePolicy = mergePolicyConfig.getLogByteSizeMergePolicy();
} else {
throw new IllegalArgumentException(
"The "
+ IndexSettings.INDEX_DATASTREAM_MERGE_POLICY.getKey()
+ " has unsupported policy specified: "
+ policy
+ ". Please use one of: "
+ DATASTREAM_TIERED_MERGE_POLICY
+ ", "
+ DATASTREAM_LOG_BYTE_SIZE_MERGE_POLICY
);
}
}

public Optional<UnaryOperator<MergePolicy>> getMergeOnFlushPolicy() {
return Optional.ofNullable(mergeOnFlushPolicy);
}
Expand Down
160 changes: 143 additions & 17 deletions server/src/main/java/org/opensearch/index/MergePolicyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.index;

import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LogByteSizeMergePolicy;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.NoMergePolicy;
import org.apache.lucene.index.TieredMergePolicy;
Expand All @@ -41,15 +42,21 @@
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;

import static org.apache.lucene.index.LogMergePolicy.DEFAULT_MAX_MERGE_DOCS;
import static org.apache.lucene.index.LogMergePolicy.DEFAULT_NO_CFS_RATIO;

/**
* A shard in opensearch is a Lucene index, and a Lucene index is broken
* down into segments. Segments are internal storage elements in the index
* where the index data is stored, and are immutable up to delete markers.
* Segments are, periodically, merged into larger segments to keep the
* index size at bay and expunge deletes.
* This class customizes and exposes 2 merge policies from lucene -
* {@link LogByteSizeMergePolicy} and {@link TieredMergePolicy}.
*
*
* <p>
* Merges select segments of approximately equal size, subject to an allowed
* Tiered merge policy select segments of approximately equal size, subject to an allowed
* number of segments per tier. The merge policy is able to merge
* non-adjacent segments, and separates how many segments are merged at once from how many
* segments are allowed per tier. It also does not over-merge (i.e., cascade merges).
Expand Down Expand Up @@ -122,11 +129,37 @@
* possibly either increase the <code>max_merged_segment</code> or issue an optimize
* call for the index (try and aim to issue it on a low traffic time).
*
/**
* <p>
* The LogByteSizeMergePolicy is an alternative merge policy designed to optimize the merging of segments in scenarios
* with time-based data.
* While the TieredMergePolicy is the default choice, the LogByteSizeMergePolicy can be configured
* as the default merge policy for time-based data using the <code>index.datastream_merge.policy</code> setting.
*
* <p>
* Unlike the TieredMergePolicy, which prioritizes merging segments of equal sizes, the LogByteSizeMergePolicy
* specializes in merging adjacent segments efficiently.
* This characteristic makes it particularly well-suited for range queries on time-based data.
* Typically, adjacent segments in time-based data often contain documents with similar timestamps.
* When these segments are merged, the resulting segment covers a range of timestamps with reduced overlap compared
* to the adjacent segments. This reduced overlap remains even as segments grow older and larger,
* which can significantly benefit range queries on timestamps.
*
* <p>
* In contrast, the TieredMergePolicy does not honor this timestamp range optimization. It focuses on merging segments
* of equal sizes and does not consider adjacency. Consequently, as segments grow older and larger,
* the overlap of timestamp ranges among adjacent segments managed by TieredMergePolicy can increase.
* This can lead to inefficiencies in range queries on timestamps, as the number of segments to be scanned
* within a given timestamp range could become high.
*
* @opensearch.internal
*/

public final class MergePolicyConfig {
private final OpenSearchTieredMergePolicy mergePolicy = new OpenSearchTieredMergePolicy();
private final OpenSearchTieredMergePolicy openSearchTieredMergePolicy = new OpenSearchTieredMergePolicy();

private final LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy();

private final Logger logger;
private final boolean mergesEnabled;

Expand All @@ -137,6 +170,12 @@ public final class MergePolicyConfig {
public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d;
public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d;
public static final double DEFAULT_DELETES_PCT_ALLOWED = 20.0d;

public static final ByteSizeValue DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE = new ByteSizeValue(
Long.MAX_VALUE / ByteSizeUnit.GB.toBytes(1),
ByteSizeUnit.GB
);

public static final Setting<Double> INDEX_COMPOUND_FORMAT_SETTING = new Setting<>(
"index.compound_format",
Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO),
Expand Down Expand Up @@ -194,6 +233,53 @@ public final class MergePolicyConfig {
Property.Dynamic,
Property.IndexScope
);

// settings for LogByteSizeMergePolicy

public static final Setting<Integer> INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting(
"index.merge.policy.log_byte_size.merge_factor",
DEFAULT_MAX_MERGE_AT_ONCE, // keeping it same as default max merge at once for tiered merge policy
2,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting(
"index.merge.policy.log_byte_size.min_merge_mb",
DEFAULT_FLOOR_SEGMENT, // keeping it same as default floor segment for tiered merge policy
Property.Dynamic,
Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting(
"index.merge.policy.log_byte_size.max_merge_segment_mb",
DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy
Property.Dynamic,
Property.IndexScope
);

public static final Setting<ByteSizeValue> INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting(
"index.merge.policy.log_byte_size.max_merge_segment_mb_forced_merge",
DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Integer> INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting(
"index.merge.policy.log_byte_size.max_merged_docs",
DEFAULT_MAX_MERGE_DOCS,
Property.Dynamic,
Property.IndexScope
);

public static final Setting<Double> INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>(
"index.merge.policy.log_byte_size.no_cfs_ratio",
Double.toString(DEFAULT_NO_CFS_RATIO),
MergePolicyConfig::parseNoCFSRatio,
Property.Dynamic,
Property.IndexScope
);

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
public static final String INDEX_MERGE_ENABLED = "index.merge.enabled";

Expand All @@ -216,13 +302,25 @@ public final class MergePolicyConfig {
);
}
maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier);
mergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING));
mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
mergePolicy.setFloorSegmentMB(floorSegment.getMbFrac());
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
mergePolicy.setSegmentsPerTier(segmentsPerTier);
mergePolicy.setDeletesPctAllowed(deletesPctAllowed);
openSearchTieredMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING));
openSearchTieredMergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed);
openSearchTieredMergePolicy.setFloorSegmentMB(floorSegment.getMbFrac());
openSearchTieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
openSearchTieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
openSearchTieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
openSearchTieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed);

// Undocumented settings, works great with defaults

logByteSizeMergePolicy.setMergeFactor(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING));
logByteSizeMergePolicy.setMinMergeMB(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING).getMbFrac());
logByteSizeMergePolicy.setMaxMergeMB(indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING).getMbFrac());
logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(
indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING).getMbFrac()
);
logByteSizeMergePolicy.setMaxMergeDocs(indexSettings.getValue(INDEX_LBS_MAX_MERGED_DOCS_SETTING));
logByteSizeMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_LBS_NO_CFS_RATIO_SETTING));

if (logger.isTraceEnabled()) {
logger.trace(
"using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}],"
Expand All @@ -239,31 +337,55 @@ public final class MergePolicyConfig {
}

void setSegmentsPerTier(Double segmentsPerTier) {
mergePolicy.setSegmentsPerTier(segmentsPerTier);
openSearchTieredMergePolicy.setSegmentsPerTier(segmentsPerTier);
}

void setMaxMergedSegment(ByteSizeValue maxMergedSegment) {
mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
openSearchTieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac());
}

void setMaxMergesAtOnce(Integer maxMergeAtOnce) {
mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
openSearchTieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce);
}

void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) {
mergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
openSearchTieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac());
}

void setExpungeDeletesAllowed(Double value) {
mergePolicy.setForceMergeDeletesPctAllowed(value);
openSearchTieredMergePolicy.setForceMergeDeletesPctAllowed(value);
}

void setNoCFSRatio(Double noCFSRatio) {
mergePolicy.setNoCFSRatio(noCFSRatio);
openSearchTieredMergePolicy.setNoCFSRatio(noCFSRatio);
}

void setDeletesPctAllowed(Double deletesPctAllowed) {
mergePolicy.setDeletesPctAllowed(deletesPctAllowed);
openSearchTieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed);
}

void setLBSMergeFactor(int mergeFactor) {
logByteSizeMergePolicy.setMergeFactor(mergeFactor);
}

void setLBSMaxMergeSegment(ByteSizeValue maxMergeSegment) {
logByteSizeMergePolicy.setMaxMergeMB(maxMergeSegment.getMbFrac());
}

void setLBSMinMergedMB(ByteSizeValue minMergedMB) {
logByteSizeMergePolicy.setMinMergeMB(minMergedMB.getMbFrac());
}

void setLBSMaxMergeMBForForcedMerge(ByteSizeValue maxMergeMBForcedMerge) {
logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(maxMergeMBForcedMerge.getMbFrac());
}

void setLBSMaxMergeDocs(int maxMergeDocs) {
logByteSizeMergePolicy.setMaxMergeDocs(maxMergeDocs);
}

void setLBSNoCFSRatio(Double noCFSRatio) {
logByteSizeMergePolicy.setNoCFSRatio(noCFSRatio);
}

private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) {
Expand All @@ -286,7 +408,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT
}

MergePolicy getMergePolicy() {
return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE;
return mergesEnabled ? openSearchTieredMergePolicy : NoMergePolicy.INSTANCE;
}

MergePolicy getLogByteSizeMergePolicy() {
return mergesEnabled ? logByteSizeMergePolicy : NoMergePolicy.INSTANCE;
}

private static double parseNoCFSRatio(String noCFSRatio) {
Expand Down
Loading

0 comments on commit 38fb06a

Please sign in to comment.