Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Allowing non-rollup and rollup indices to be searched together (#1268) #1283

Open
wants to merge 2 commits into
base: 2.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_DASHBOARDS,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT,
TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS,
TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,18 @@ class RollupInterceptor(

@Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings)

@Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) {
searchEnabled = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) {
searchAllJobs = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) {
searchRawRollupIndices = it
}
}

@Suppress("SpreadOperator")
Expand Down Expand Up @@ -143,13 +148,15 @@ class RollupInterceptor(
var allMatchingRollupJobs: Map<Rollup, Set<RollupFieldMapping>> = mapOf()
for (concreteIndex in concreteIndices) {
val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs()
?: throw IllegalArgumentException("Not all indices have rollup job")

val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
if (rollupJobs != null) {
val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs)
if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) {
throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues")
}
allMatchingRollupJobs += matchingRollupJobs
} else if (!searchRawRollupIndices) {
throw IllegalArgumentException("Not all indices have rollup job")
}
allMatchingRollupJobs += matchingRollupJobs
}
return allMatchingRollupJobs
}
Expand Down Expand Up @@ -342,6 +349,9 @@ class RollupInterceptor(
if (searchAllJobs) {
request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex))
} else {
if (matchingRollupJobs.keys.size > 1) {
logger.trace("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window")
}
request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ class RollupSettings {
companion object {
const val DEFAULT_ROLLUP_ENABLED = true
const val DEFAULT_SEARCH_ALL_JOBS = false
const val DEFAULT_SEARCH_SOURCE_INDICES = false
const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3
const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L
const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3
Expand Down Expand Up @@ -78,11 +79,20 @@ class RollupSettings {
Setting.Property.Dynamic,
)

val ROLLUP_DASHBOARDS: Setting<Boolean> = Setting.boolSetting(
"plugins.rollup.dashboards.enabled",
LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
)
val ROLLUP_SEARCH_SOURCE_INDICES: Setting<Boolean> =
Setting.boolSetting(
"plugins.rollup.search.search_source_indices",
DEFAULT_SEARCH_SOURCE_INDICES,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
)

val ROLLUP_DASHBOARDS: Setting<Boolean> =
Setting.boolSetting(
"plugins.rollup.dashboards.enabled",
LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS,
Setting.Property.NodeScope,
Setting.Property.Dynamic,
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
RollupSettings.ROLLUP_ENABLED,
RollupSettings.ROLLUP_SEARCH_ENABLED,
RollupSettings.ROLLUP_SEARCH_ALL_JOBS,
RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES,
RollupSettings.ROLLUP_DASHBOARDS,
SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES,
),
Expand Down Expand Up @@ -177,6 +178,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false)
assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false)
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1)
assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() {
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateSearchRawRollupClusterSetting(value: Boolean) {
val formattedValue = "\"${value}\""
val request =
"""
{
"persistent": {
"${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue
}
}
""".trimIndent()
val res =
client().makeRequest(
"PUT", "_cluster/settings", emptyMap(),
StringEntity(request, APPLICATION_JSON),
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun createSampleIndexForQSQTest(index: String) {
val mapping = """
"properties": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1040,12 +1040,12 @@ class RollupInterceptorIT : RollupRestTestCase() {
},
"aggs": {
"sum_passenger_count": { "sum": { "field": "passenger_count" } },
"max_passenger_count": { "max": { "field": "passenger_count" } },
"value_count_passenger_count": { "value_count": { "field": "passenger_count" } }
"max_passenger_count": { "max": { "field": "passenger_count" } }
}
}
""".trimIndent()
// Search 1 non-rollup index and 1 rollup
// Search 1 non-rollup index and 1 rollup
updateSearchRawRollupClusterSetting(false)
val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult1.restStatus() == RestStatus.OK)
val failures = extractFailuresFromSearchResponse(searchResult1)
Expand All @@ -1061,6 +1061,29 @@ class RollupInterceptorIT : RollupRestTestCase() {
"Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response",
)

// Updating to allow searching on non-rollup and rolled-up index together
updateSearchRawRollupClusterSetting(true)
val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes1.restStatus() == RestStatus.OK)
val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(rawRes2.restStatus() == RestStatus.OK)
val searchResult = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON))
assertTrue(searchResult.restStatus() == RestStatus.OK)
val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map<String, Map<String, Any>>
val rollupAggResMulti = searchResult.asMap()["aggregations"] as Map<String, Map<String, Any>>

val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double

assertEquals(
"Searching single raw source index and rollup target index did not return the same sum results",
rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"],
)
assertEquals(
"Searching rollup target index did not return the sum for all of the rollup jobs on the index",
trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"],
)

// Search 2 rollups with different mappings
try {
client().makeRequest(
Expand Down
Loading