Skip to content

Commit

Permalink
Extracted get sample doc logic into helper function. Added sorting fo…
Browse files Browse the repository at this point in the history
…r sample docs.

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Mar 12, 2024
1 parent a1dc077 commit b3c4f4b
Showing 1 changed file with 59 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -237,9 +238,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {

// Only collect sample docs for triggered triggers, and only when at least 1 action prints sample doc data.
val isTriggered = !nextAlerts[trigger.id]?.get(AlertCategory.NEW).isNullOrEmpty()
@Suppress("UNCHECKED_CAST")
if (isTriggered && printsSampleDocData(trigger)) {
val sampleDocumentsByBucket = mutableMapOf<String, List<Map<String, Any>>>()
try {
val searchRequest = monitorCtx.inputService!!.getSearchRequest(
monitor = monitor.copy(triggers = listOf(trigger)),
Expand All @@ -250,45 +249,13 @@ object BucketLevelMonitorRunner : MonitorRunner() {
matchingDocIdsPerIndex = null,
returnSampleDocs = true
)

val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(searchRequest, it) }
val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf<String, Any>()) as Map<String, Any>
val compositeAgg = aggs.getOrDefault("composite_agg", mapOf<String, Any>()) as Map<String, Any>
val buckets = compositeAgg.getOrDefault("buckets", emptyList<Map<String, Any>>()) as List<Map<String, Any>>

buckets.forEach { bucket ->
val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf<String, String>()) as Map<String, String>).values.toList())
if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.")

val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val topHits = unwrappedTopHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val lowHits = unwrappedLowHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

val allHits = topHits + lowHits

if (allHits.isEmpty()) {
// We expect sample documents to be available for each bucket.
logger.error("Sample documents not found for trigger {} of monitor {}.", trigger.id, monitor.id)
}

// Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each.
// The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data.
val uniqueHitIds = mutableSetOf<String>()
val dedupedHits = mutableListOf<Map<String, Any>>()
allHits.forEach { hit ->
val hitId = hit["_id"] as String
if (!uniqueHitIds.contains(hitId)) {
uniqueHitIds.add(hitId)
dedupedHits.add(hit)
}
}
sampleDocumentsByBucket[bucketKey] = dedupedHits
}

monitorCtx.client
val sampleDocumentsByBucket = getSampleDocs(
client = monitorCtx.client!!,
monitorId = monitor.id,
triggerId = trigger.id,
searchRequest = searchRequest
)
alertSampleDocs[trigger.id] = sampleDocumentsByBucket
} catch (e: Exception) {
logger.error("Error retrieving sample documents for trigger {} of monitor {}.", trigger.id, monitor.id, e)
Expand Down Expand Up @@ -583,4 +550,55 @@ object BucketLevelMonitorRunner : MonitorRunner() {
AlertContext(alert = alert, sampleDocs = listOf())
}
}

@Suppress("UNCHECKED_CAST")
private suspend fun getSampleDocs(
client: Client,
monitorId: String,
triggerId: String,
searchRequest: SearchRequest
): Map<String, List<Map<String, Any>>> {
val sampleDocumentsByBucket = mutableMapOf<String, List<Map<String, Any>>>()

val searchResponse: SearchResponse = client.suspendUntil { client.search(searchRequest, it) }
val aggs = searchResponse.convertToMap().getOrDefault("aggregations", mapOf<String, Any>()) as Map<String, Any>
val compositeAgg = aggs.getOrDefault("composite_agg", mapOf<String, Any>()) as Map<String, Any>
val buckets = compositeAgg.getOrDefault("buckets", emptyList<Map<String, Any>>()) as List<Map<String, Any>>

buckets.forEach { bucket ->
val bucketKey = getBucketKeysHash((bucket.getOrDefault("key", mapOf<String, String>()) as Map<String, String>).values.toList())
if (bucketKey.isEmpty()) throw IllegalStateException("Cannot format bucket keys.")

val unwrappedTopHits = (bucket.getOrDefault("top_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val topHits = unwrappedTopHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

val unwrappedLowHits = (bucket.getOrDefault("low_hits", mapOf<String, Any>()) as Map<String, Any>)
.getOrDefault("hits", mapOf<String, Any>()) as Map<String, Any>
val lowHits = unwrappedLowHits.getOrDefault("hits", listOf<Map<String, Any>>()) as List<Map<String, Any>>

// Reversing the order of lowHits so allHits will be in descending order.
val allHits = topHits + lowHits.reversed()

if (allHits.isEmpty()) {
// We expect sample documents to be available for each bucket.
logger.error("Sample documents not found for trigger {} of monitor {}.", triggerId, monitorId)
}

// Removing duplicate hits. The top_hits, and low_hits results return a max of 5 docs each.
// The same document could be present in both hit lists if there are fewer than 10 documents in the bucket of data.
val uniqueHitIds = mutableSetOf<String>()
val dedupedHits = mutableListOf<Map<String, Any>>()
allHits.forEach { hit ->
val hitId = hit["_id"] as String
if (!uniqueHitIds.contains(hitId)) {
uniqueHitIds.add(hitId)
dedupedHits.add(hit)
}
}
sampleDocumentsByBucket[bucketKey] = dedupedHits
}

return sampleDocumentsByBucket
}
}

0 comments on commit b3c4f4b

Please sign in to comment.