From 48e6eb2f1206a19bfe353e651e91f45a871d7313 Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Fri, 27 Sep 2024 14:47:35 +0530 Subject: [PATCH 1/5] Allowing non-rollup and rollup indices to be searched together Signed-off-by: Kshitij Tandon --- .../indexmanagement/IndexManagementPlugin.kt | 1 + .../rollup/interceptor/RollupInterceptor.kt | 22 +++++++++++---- .../rollup/settings/RollupSettings.kt | 22 +++++++++++---- .../IndexManagementSettingsTests.kt | 2 ++ .../rollup/RollupRestTestCase.kt | 18 ++++++++++++ .../rollup/interceptor/RollupInterceptorIT.kt | 28 +++++++++++++++++++ 6 files changed, 81 insertions(+), 12 deletions(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index 9c79fb9e2..1dab11e64 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -527,6 +527,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_DASHBOARDS, RollupSettings.ROLLUP_SEARCH_ALL_JOBS, + RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_COUNT, TransformSettings.TRANSFORM_JOB_INDEX_BACKOFF_MILLIS, TransformSettings.TRANSFORM_JOB_SEARCH_BACKOFF_COUNT, diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index c65584afe..1d57494fe 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -64,6 +64,8 @@ class RollupInterceptor( @Volatile private var searchAllJobs = RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings) + @Volatile private var searchRawRollupIndices = RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ENABLED) { searchEnabled = it @@ -71,6 +73,9 @@ class RollupInterceptor( clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_ALL_JOBS) { searchAllJobs = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES) { + searchRawRollupIndices = it + } } @Suppress("SpreadOperator") @@ -143,13 +148,15 @@ class RollupInterceptor( var allMatchingRollupJobs: Map> = mapOf() for (concreteIndex in concreteIndices) { val rollupJobs = clusterService.state().metadata.index(concreteIndex).getRollupJobs() - ?: throw IllegalArgumentException("Not all indices have rollup job") - - val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) - if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { - throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + if (rollupJobs != null) { + val (matchingRollupJobs, issues) = findMatchingRollupJobs(fieldMappings, rollupJobs) + if (issues.isNotEmpty() || matchingRollupJobs.isEmpty()) { + throw IllegalArgumentException("Could not find a rollup job that can answer this query because $issues") + } + allMatchingRollupJobs += matchingRollupJobs + } else if (!searchRawRollupIndices) { + throw IllegalArgumentException("Not all indices have rollup job") } - allMatchingRollupJobs += matchingRollupJobs } return allMatchingRollupJobs } @@ -342,6 +349,9 @@ class RollupInterceptor( if (searchAllJobs) { request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex)) } else { + if (matchingRollupJobs.keys.size > 1) { + logger.warn("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window") + } request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex)) } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt index bc7228853..1d11a36db 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/settings/RollupSettings.kt @@ -14,6 +14,7 @@ class RollupSettings { companion object { const val DEFAULT_ROLLUP_ENABLED = true const val DEFAULT_SEARCH_ALL_JOBS = false + const val DEFAULT_SEARCH_SOURCE_INDICES = false const val DEFAULT_ACQUIRE_LOCK_RETRY_COUNT = 3 const val DEFAULT_ACQUIRE_LOCK_RETRY_DELAY = 1000L const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3 @@ -78,11 +79,20 @@ class RollupSettings { Setting.Property.Dynamic, ) - val ROLLUP_DASHBOARDS: Setting = Setting.boolSetting( - "plugins.rollup.dashboards.enabled", - LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS, - Setting.Property.NodeScope, - Setting.Property.Dynamic, - ) + val ROLLUP_SEARCH_SOURCE_INDICES: Setting = + Setting.boolSetting( + "plugins.rollup.search.search_source_indices", + DEFAULT_SEARCH_SOURCE_INDICES, + Setting.Property.NodeScope, + Setting.Property.Dynamic, + ) + + val ROLLUP_DASHBOARDS: Setting = + Setting.boolSetting( + "plugins.rollup.dashboards.enabled", + LegacyOpenDistroRollupSettings.ROLLUP_DASHBOARDS, + Setting.Property.NodeScope, + Setting.Property.Dynamic, + ) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt index 7988ee2e2..97c47e0c5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementSettingsTests.kt @@ -94,6 +94,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { RollupSettings.ROLLUP_ENABLED, RollupSettings.ROLLUP_SEARCH_ENABLED, RollupSettings.ROLLUP_SEARCH_ALL_JOBS, + RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES, RollupSettings.ROLLUP_DASHBOARDS, SnapshotManagementSettings.FILTER_BY_BACKEND_ROLES, ), @@ -177,6 +178,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() { assertEquals(RollupSettings.ROLLUP_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ENABLED.get(settings), false) assertEquals(RollupSettings.ROLLUP_SEARCH_ALL_JOBS.get(settings), false) + assertEquals(RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.get(settings), false) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) assertEquals(RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT.get(settings), 1) assertEquals(RollupSettings.ROLLUP_SEARCH_BACKOFF_MILLIS.get(settings), TimeValue.timeValueMillis(1)) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index c818d293e..25d2c47ee 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -263,6 +263,24 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { assertEquals("Request failed", RestStatus.OK, res.restStatus()) } + protected fun updateSearchRawRollupClusterSetting(value: Boolean) { + val formattedValue = "\"${value}\"" + val request = + """ + { + "persistent": { + "${RollupSettings.ROLLUP_SEARCH_SOURCE_INDICES.key}": $formattedValue + } + } + """.trimIndent() + val res = + client().makeRequest( + "PUT", "_cluster/settings", emptyMap(), + StringEntity(request, ContentType.APPLICATION_JSON), + ) + assertEquals("Request failed", RestStatus.OK, res.restStatus()) + } + protected fun createSampleIndexForQSQTest(index: String) { val mapping = """ "properties": { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 305fdfa2d..9752f5345 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1061,6 +1061,34 @@ class RollupInterceptorIT : RollupRestTestCase() { "Not all indices have rollup job", failures?.get(0)?.get("reason") ?: "Didn't find failure reason in search response", ) + // Updating to allow searching on non-rollup and rolled-up index together + updateSearchRawRollupClusterSetting(true) + val rawRes1 = client().makeRequest("POST", "/$sourceIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes1.restStatus() == RestStatus.OK) + val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(rawRes2.restStatus() == RestStatus.OK) + val searchResult2 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(searchResult2.restStatus() == RestStatus.OK) + val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map> + val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map> + val rollupAggResMulti = searchResult2.asMap()["aggregations"] as Map> + + val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int + val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double + + assertEquals( + "Searching single raw source index and rollup target index did not return the same sum results", + rawAgg1Res.getValue("max_passenger_count")["value"], rollupAggResMulti.getValue("max_passenger_count")["value"], + ) + assertEquals( + "Searching rollup target index did not return the sum for all of the rollup jobs on the index", + trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"], + ) + assertEquals( + "Searching rollup target index did not return the value count for all of the rollup jobs on the index", + trueAggCount, rollupAggResMulti.getValue("value_count_passenger_count")["value"], + ) + // Search 2 rollups with different mappings try { client().makeRequest( From ac73c2679cc150df8c1aeedaa5e90d4c3c0dd6f1 Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Fri, 27 Sep 2024 17:24:20 +0530 Subject: [PATCH 2/5] Fixing an issue in the integration test Signed-off-by: Kshitij Tandon --- .../rollup/interceptor/RollupInterceptorIT.kt | 19 +++++++------------ 1 file changed, 7 insertions(+), 12 deletions(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index 9752f5345..ffaca1ff8 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1040,12 +1040,12 @@ class RollupInterceptorIT : RollupRestTestCase() { }, "aggs": { "sum_passenger_count": { "sum": { "field": "passenger_count" } }, - "max_passenger_count": { "max": { "field": "passenger_count" } }, - "value_count_passenger_count": { "value_count": { "field": "passenger_count" } } + "max_passenger_count": { "max": { "field": "passenger_count" } } } } - """.trimIndent() - // Search 1 non-rollup index and 1 rollup + """.trimIndent() +// Search 1 non-rollup index and 1 rollup + updateSearchRawRollupClusterSetting(false) val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) assertTrue(searchResult1.restStatus() == RestStatus.OK) val failures = extractFailuresFromSearchResponse(searchResult1) @@ -1067,13 +1067,12 @@ class RollupInterceptorIT : RollupRestTestCase() { assertTrue(rawRes1.restStatus() == RestStatus.OK) val rawRes2 = client().makeRequest("POST", "/$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) assertTrue(rawRes2.restStatus() == RestStatus.OK) - val searchResult2 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) - assertTrue(searchResult2.restStatus() == RestStatus.OK) + val searchResult = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) + assertTrue(searchResult.restStatus() == RestStatus.OK) val rawAgg1Res = rawRes1.asMap()["aggregations"] as Map> val rawAgg2Res = rawRes2.asMap()["aggregations"] as Map> - val rollupAggResMulti = searchResult2.asMap()["aggregations"] as Map> + val rollupAggResMulti = searchResult.asMap()["aggregations"] as Map> - val trueAggCount = rawAgg1Res.getValue("value_count_passenger_count")["value"] as Int + rawAgg2Res.getValue("value_count_passenger_count")["value"] as Int val trueAggSum = rawAgg1Res.getValue("sum_passenger_count")["value"] as Double + rawAgg2Res.getValue("sum_passenger_count")["value"] as Double assertEquals( @@ -1084,10 +1083,6 @@ class RollupInterceptorIT : RollupRestTestCase() { "Searching rollup target index did not return the sum for all of the rollup jobs on the index", trueAggSum, rollupAggResMulti.getValue("sum_passenger_count")["value"], ) - assertEquals( - "Searching rollup target index did not return the value count for all of the rollup jobs on the index", - trueAggCount, rollupAggResMulti.getValue("value_count_passenger_count")["value"], - ) // Search 2 rollups with different mappings try { From 73cfc1a586503fddc06815d46b14f01ecb6891ef Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Fri, 18 Oct 2024 14:26:10 +0530 Subject: [PATCH 3/5] Using trace in place of warn in logger Signed-off-by: Kshitij Tandon --- .../indexmanagement/rollup/interceptor/RollupInterceptor.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt index 1d57494fe..ba72b6426 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptor.kt @@ -350,7 +350,7 @@ class RollupInterceptor( request.source(request.source().rewriteSearchSourceBuilder(matchingRollupJobs.keys, fieldNameMappingTypeMap, concreteSourceIndex)) } else { if (matchingRollupJobs.keys.size > 1) { - logger.warn("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window") + logger.trace("Trying search with search across multiple rollup jobs disabled so will give result with largest rollup window") } request.source(request.source().rewriteSearchSourceBuilder(matchedRollup, fieldNameMappingTypeMap, concreteSourceIndex)) } From f1707c8137fcc5130ff292f44722e9b0888f99d7 Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Tue, 29 Oct 2024 16:54:01 +0530 Subject: [PATCH 4/5] Removing whitespace causing ktlint failure Signed-off-by: Kshitij Tandon --- .../indexmanagement/rollup/interceptor/RollupInterceptorIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt index ffaca1ff8..7d74bbf42 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/interceptor/RollupInterceptorIT.kt @@ -1043,7 +1043,7 @@ class RollupInterceptorIT : RollupRestTestCase() { "max_passenger_count": { "max": { "field": "passenger_count" } } } } - """.trimIndent() + """.trimIndent() // Search 1 non-rollup index and 1 rollup updateSearchRawRollupClusterSetting(false) val searchResult1 = client().makeRequest("POST", "/$sourceIndex2,$targetIndex2/_search", emptyMap(), StringEntity(req, ContentType.APPLICATION_JSON)) From 6eb0dbb35c13a39c7e8487b14d5d25fa81c03cb2 Mon Sep 17 00:00:00 2001 From: Kshitij Tandon Date: Tue, 29 Oct 2024 17:08:58 +0530 Subject: [PATCH 5/5] Resolving a compile error Signed-off-by: Kshitij Tandon --- .../org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 25d2c47ee..47a7c67fe 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -276,7 +276,7 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { val res = client().makeRequest( "PUT", "_cluster/settings", emptyMap(), - StringEntity(request, ContentType.APPLICATION_JSON), + StringEntity(request, APPLICATION_JSON), ) assertEquals("Request failed", RestStatus.OK, res.restStatus()) }