Skip to content

Commit

Permalink
[Backport 2.x] Enhance per bucket, and per document monitor notificat…
Browse files Browse the repository at this point in the history
…ion message ctx. (opensearch-project#1450) (opensearch-project#1477)

* Enhance per bucket, and per document monitor notification message ctx. (opensearch-project#1450)

* Adding dev logs.

Signed-off-by: AWSHurneyt <[email protected]>

* Added support for returning sample documents for bucket level monitors.

Signed-off-by: AWSHurneyt <[email protected]>

* Added support for printing query/rule info in notification messages.

Signed-off-by: AWSHurneyt <[email protected]>

* Extracted out helper function.

Signed-off-by: AWSHurneyt <[email protected]>

* Extracted out helper function.

Signed-off-by: AWSHurneyt <[email protected]>

* Added support for printing document data in notification messages for document level monitors.

Signed-off-by: AWSHurneyt <[email protected]>

* Refactored logic after making AlertContext a separate class from Alert instead of inheriting/extending it in common utils.

Signed-off-by: AWSHurneyt <[email protected]>

* Moved AlertContext data model from common utils to alerting plugin.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Added additional unit tests.

Signed-off-by: AWSHurneyt <[email protected]>

* Extracted sample doc aggs logic into helper function. Added support for sorting sample docs based on metric aggregations.

Signed-off-by: AWSHurneyt <[email protected]>

* Extracted get sample doc logic into helper function. Added sorting for sample docs.

Signed-off-by: AWSHurneyt <[email protected]>

* Removed dev code.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors.

Signed-off-by: AWSHurneyt <[email protected]>

* Added comments based on PR feedback.

Signed-off-by: AWSHurneyt <[email protected]>

* Added logic to make mGet calls in batches.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>

(cherry picked from commit 5dc690c)
Signed-off-by: AWSHurneyt <[email protected]>

* Fixed imports.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Mar 15, 2024
1 parent dde6aab commit ae046ee
Show file tree
Hide file tree
Showing 14 changed files with 1,388 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.opensearchapi.withClosableContext
Expand All @@ -25,7 +27,9 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -221,6 +225,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
}
}

// The alertSampleDocs map structure is Map<TriggerId, Map<BucketKeysHash, List<Alert>>>
val alertSampleDocs = mutableMapOf<String, Map<String, List<Map<String, Any>>>>()
for (trigger in monitor.triggers) {
val alertsToUpdate = mutableSetOf<Alert>()
val completedAlertsToUpdate = mutableSetOf<Alert>()
Expand All @@ -231,6 +237,32 @@ object BucketLevelMonitorRunner : MonitorRunner() {
?: mutableListOf()
// Update nextAlerts so the filtered DEDUPED Alerts are reflected for PER_ALERT Action execution
nextAlerts[trigger.id]?.set(AlertCategory.DEDUPED, dedupedAlerts)

// Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data.
val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty()
if (isTriggered && printsSampleDocData(trigger)) {
try {
val searchRequest = monitorCtx.inputService!!.getSearchRequest(
monitor = monitor.copy(triggers = listOf(trigger)),
searchInput = monitor.inputs[0] as SearchInput,
periodStart = periodStart,
periodEnd = periodEnd,
prevResult = monitorResult.inputResults,
matchingDocIdsPerIndex = null,
returnSampleDocs = true
)
val sampleDocumentsByBucket = getSampleDocs(
client = monitorCtx.client!!,
monitorId = monitor.id,
triggerId = trigger.id,
searchRequest = searchRequest
)
alertSampleDocs[trigger.id] = sampleDocumentsByBucket
} catch (e: Exception) {
logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e)
}
}

val newAlerts = nextAlerts[trigger.id]?.get(AlertCategory.NEW) ?: mutableListOf()
val completedAlerts = nextAlerts[trigger.id]?.get(AlertCategory.COMPLETED) ?: mutableListOf()

Expand All @@ -256,9 +288,12 @@ object BucketLevelMonitorRunner : MonitorRunner() {
for (alertCategory in actionExecutionScope.actionableAlerts) {
val alertsToExecuteActionsFor = nextAlerts[trigger.id]?.get(alertCategory) ?: mutableListOf()
for (alert in alertsToExecuteActionsFor) {
val alertContext = if (alertCategory != AlertCategory.NEW) AlertContext(alert = alert)
else getAlertContext(alert = alert, alertSampleDocs = alertSampleDocs)

val actionCtx = getActionContextForAlertCategory(
alertCategory,
alert,
alertContext,
triggerCtx,
monitorOrTriggerError
)
Expand Down Expand Up @@ -292,7 +327,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {

val actionCtx = triggerCtx.copy(
dedupedAlerts = dedupedAlerts,
newAlerts = newAlerts,
newAlerts = newAlerts.map {
getAlertContext(alert = it, alertSampleDocs = alertSampleDocs)
},
completedAlerts = completedAlerts,
error = monitorResult.error ?: triggerResult.error
)
Expand Down Expand Up @@ -487,17 +524,93 @@ object BucketLevelMonitorRunner : MonitorRunner() {

private fun getActionContextForAlertCategory(
alertCategory: AlertCategory,
alert: Alert,
alertContext: AlertContext,
ctx: BucketLevelTriggerExecutionContext,
error: Exception?
): BucketLevelTriggerExecutionContext {
return when (alertCategory) {
AlertCategory.DEDUPED ->
ctx.copy(dedupedAlerts = listOf(alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = listOf(alertContext.alert), newAlerts = emptyList(), completedAlerts = emptyList(), error = error)
AlertCategory.NEW ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alert), completedAlerts = emptyList(), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = listOf(alertContext), completedAlerts = emptyList(), error = error)
AlertCategory.COMPLETED ->
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alert), error = error)
ctx.copy(dedupedAlerts = emptyList(), newAlerts = emptyList(), completedAlerts = listOf(alertContext.alert), error = error)
}
}

private fun getAlertContext(
alert: Alert,
alertSampleDocs: Map<String, Map<String, List<Map<String, Any>>>>
): AlertContext {
val bucketKey = alert.aggregationResultBucket?.getBucketKeysHash()
val sampleDocs = alertSampleDocs[alert.triggerId]?.get(bucketKey)
return if (!bucketKey.isNullOrEmpty() && !sampleDocs.isNullOrEmpty()) {
AlertContext(alert = alert, sampleDocs = sampleDocs)
} else {
logger.error(
"Failed to retrieve sample documents for alert {} from trigger {} of monitor {} during execution {}.",
alert.id,
alert.triggerId,
alert.monitorId,
alert.executionId
)
AlertContext(alert = alert, sampleDocs = listOf())
}
}

/**
* Executes the monitor's query with the addition of 2 top_hits aggregations that are used to return the top 5,
* and bottom 5 documents for each bucket.
*
* @return Map<BucketKeysHash, List<Alert>>
*/
@Suppress("UNCHECKED_CAST")
private suspend fun getSampleDocs(
client: Client,
monitorId: String,
triggerId: String,
searchRequest: SearchRequest
): Map<String, List<Map<String, Any>>> {
val sampleDocumentsByBucket = mutableMapOf<String, List<Map<String, Any>>>()
val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf<String, Any>()) as Map<String, Any>
val compositeAgg = aggs.getOrDefault("composite_agg", mapOf<String, Any>()) as Map<String, Any>
val buckets = compositeAgg.getOrDefault("buckets", emptyList<Map<String, Any>>()) as List<Map<String, Any>>

buckets.forEach { bucket ->
val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf<String, String>()) as Map<String, String>).values.toList())
if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.")

val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val topHits = unwrappedTopHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val lowHits = unwrappedLowHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

// Reversing the order of lowHits so allHits will be in descending order.
val allHits = topHits + lowHits.reversed()

if (allHits.isEmpty()) {
// We expect sample documents to be available for each bucket.
logger.error("Sample documents not found for trigger {} of monitor {}.", triggerId, monitorId)
}

// Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each.
// The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data.
val uniqueHitIds = mutableSetOf<String>()
val dedupedHits = mutableListOf<Map<String, Any>>()
allHits.forEach { hit ->
val hitId = hit["_id"] as String
if (!uniqueHitIds.contains(hitId)) {
uniqueHitIds.add(hitId)
dedupedHits.add(hit)
}
}
sampleDocumentsByBucket[bucketKey] = dedupedHits
}

return sampleDocumentsByBucket
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,28 @@ import org.opensearch.action.admin.indices.refresh.RefreshAction
import org.opensearch.action.admin.indices.refresh.RefreshRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.get.MultiGetItemResponse
import org.opensearch.action.get.MultiGetRequest
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.IndexExecutionContext
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.userErrorMessage
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
Expand Down Expand Up @@ -64,6 +70,7 @@ import org.opensearch.percolator.PercolateQueryBuilderExt
import org.opensearch.search.SearchHit
import org.opensearch.search.SearchHits
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.search.fetch.subphase.FetchSourceContext
import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
Expand All @@ -83,6 +90,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()

// Maps a finding ID to the related document.
private val findingIdToDocSource = mutableMapOf<String, MultiGetItemResponse>()

override suspend fun runMonitor(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
Expand All @@ -95,6 +105,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
monitorCtx.findingsToTriggeredQueries = mutableMapOf()

try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
Expand Down Expand Up @@ -455,7 +466,15 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
error = monitorResult.error ?: triggerResult.error
)

if (printsSampleDocData(trigger) && triggerFindingDocPairs.isNotEmpty())
getDocSources(
findingToDocPairs = findingToDocPairs,
monitorCtx = monitorCtx,
monitor = monitor
)

val alerts = mutableListOf<Alert>()
val alertContexts = mutableListOf<AlertContext>()
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
Expand All @@ -466,6 +485,18 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
workflorwRunContext = workflowRunContext
)
alerts.add(alert)

val docSource = findingIdToDocSource[alert.findingIds.first()]?.response?.convertToMap()

alertContexts.add(
AlertContext(
alert = alert,
associatedQueries = alert.findingIds.flatMap { findingId ->
monitorCtx.findingsToTriggeredQueries?.getOrDefault(findingId, emptyList()) ?: emptyList()
},
sampleDocs = listOfNotNull(docSource)
)
)
}

val shouldDefaultToPerExecution = defaultToPerExecutionAction(
Expand All @@ -479,13 +510,13 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
for (action in trigger.actions) {
val actionExecutionScope = action.getActionExecutionPolicy(monitor)!!.actionExecutionScope
if (actionExecutionScope is PerAlertActionScope && !shouldDefaultToPerExecution) {
for (alert in alerts) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alert)), monitorCtx, monitor, dryrun)
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
for (alertContext in alertContexts) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = listOf(alertContext)), monitorCtx, monitor, dryrun)
triggerResult.actionResultsMap.getOrPut(alertContext.alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alertContext.alert.id]?.set(action.id, actionResults)
}
} else if (alerts.isNotEmpty()) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = alerts), monitorCtx, monitor, dryrun)
} else if (alertContexts.isNotEmpty()) {
val actionResults = this.runAction(action, actionCtx.copy(alerts = alertContexts), monitorCtx, monitor, dryrun)
for (alert in alerts) {
triggerResult.actionResultsMap.getOrPut(alert.id) { mutableMapOf() }
triggerResult.actionResultsMap[alert.id]?.set(action.id, actionResults)
Expand Down Expand Up @@ -532,6 +563,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()
val findingsToTriggeredQueries = mutableMapOf<String, List<DocLevelQuery>>()

docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
Expand All @@ -552,6 +584,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)
findingsToTriggeredQueries[finding.id] = triggeredQueries

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
Expand All @@ -578,6 +611,10 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// suppress exception
logger.error("Optional finding callback failed", e)
}

if (monitorCtx.findingsToTriggeredQueries == null) monitorCtx.findingsToTriggeredQueries = findingsToTriggeredQueries
else monitorCtx.findingsToTriggeredQueries = monitorCtx.findingsToTriggeredQueries!! + findingsToTriggeredQueries

return findingDocPairs
}

Expand Down Expand Up @@ -1047,6 +1084,40 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return numDocs >= maxNumDocsThreshold
}

/**
* Performs an mGet request to retrieve the documents associated with findings.
*
* When possible, this will only retrieve the document fields that are specifically
* referenced for printing in the mustache template.
*/
private suspend fun getDocSources(
findingToDocPairs: List<Pair<String, String>>,
monitorCtx: MonitorRunnerExecutionContext,
monitor: Monitor
) {
val docFieldTags = parseSampleDocTags(monitor.triggers)
val request = MultiGetRequest()

// Perform mGet request in batches.
findingToDocPairs.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch ->
batch.forEach { (findingId, docIdAndIndex) ->
val docIdAndIndexSplit = docIdAndIndex.split("|")
val docId = docIdAndIndexSplit[0]
val concreteIndex = docIdAndIndexSplit[1]
if (findingId.isNotEmpty() && docId.isNotEmpty() && concreteIndex.isNotEmpty()) {
val docItem = MultiGetRequest.Item(concreteIndex, docId)
if (docFieldTags.isNotEmpty())
docItem.fetchSourceContext(FetchSourceContext(true, docFieldTags.toTypedArray(), emptyArray()))
request.add(docItem)
}
val response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.multiGet(request, it) }
response.responses.forEach { item ->
findingIdToDocSource[findingId] = item
}
}
}
}

/**
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
* and doc source. A list of these POJOs would be passed to percolate query execution logic.
Expand Down
Loading

0 comments on commit ae046ee

Please sign in to comment.