diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt index 8709a6b98..58e050645 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/TransformSearchService.kt @@ -16,6 +16,7 @@ import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.index.IndexRequest import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse +import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.client.Client import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.service.ClusterService @@ -47,6 +48,7 @@ import org.opensearch.indexmanagement.util.IndexUtils.Companion.LUCENE_MAX_CLAUS import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.transform.settings.TransformSettings.Companion.MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS import org.opensearch.search.aggregations.Aggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregation import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder @@ -78,11 +80,17 @@ class TransformSearchService( @Volatile private var backoffPolicy = BackoffPolicy.constantBackoff(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS.get(settings), TRANSFORM_JOB_SEARCH_BACKOFF_COUNT.get(settings)) + @Volatile private var cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(settings) + init { clusterService.clusterSettings.addSettingsUpdateConsumer(TRANSFORM_JOB_SEARCH_BACKOFF_MILLIS, TRANSFORM_JOB_SEARCH_BACKOFF_COUNT) { millis, count -> backoffPolicy = BackoffPolicy.constantBackoff(millis, count) } + + clusterService.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) { + cancelAfterTimeInterval = it + } } @Suppress("RethrowCaughtException") @@ -187,7 +195,11 @@ class TransformSearchService( val searchStart = Instant.now().epochSecond val searchResponse = backoffPolicy.retryTransformSearch(logger, transformContext.transformLockManager) { val pageSizeDecay = 2f.pow(retryAttempt++) - val searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds() + + var searchRequestTimeoutInSeconds = transformContext.getMaxRequestTimeoutInSeconds() + if (searchRequestTimeoutInSeconds == null) { + searchRequestTimeoutInSeconds = getCancelAfterTimeIntervalSeconds(cancelAfterTimeInterval.seconds) + } client.suspendUntil { listener: ActionListener -> // If the previous request of the current transform job execution was successful, take the page size of previous request. @@ -224,6 +236,16 @@ class TransformSearchService( } } + private fun getCancelAfterTimeIntervalSeconds(givenIntervalSeconds: Long): Long { + // The default value for the cancelAfterTimeInterval is -1 and so, in this case + // we should ignore processing on the value + if (givenIntervalSeconds == -1L) { + return -1 + } + + return max(givenIntervalSeconds, MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS) + } + companion object { const val failedSearchErrorMessage = "Failed to search data in source indices" const val modifiedBucketsErrorMessage = "Failed to get the modified buckets in source indices" diff --git a/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt b/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt index 9ed375344..abfdc1b1f 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/transform/settings/TransformSettings.kt @@ -14,6 +14,7 @@ class TransformSettings { companion object { const val DEFAULT_RENEW_LOCK_RETRY_COUNT = 3 const val DEFAULT_RENEW_LOCK_RETRY_DELAY = 1000L + const val MINIMUM_CANCEL_AFTER_TIME_INTERVAL_SECONDS = 600L val TRANSFORM_JOB_SEARCH_BACKOFF_COUNT: Setting = Setting.intSetting( "plugins.transform.internal.search.backoff_count",