Skip to content

Commit

Permalink
Fixed monitor type checks. (opensearch-project#1558)
Browse files Browse the repository at this point in the history
* Fixed monitor type checks.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed monitor type checks.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint errors. Removed redundant toString calls.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed wildcard imports.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Jun 25, 2024
1 parent 239854c commit 8713997
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.MonitorType
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
Expand All @@ -48,6 +49,8 @@ import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.transport.RemoteTransportException
import java.util.Locale
import kotlin.collections.HashMap

private val log = LogManager.getLogger(MonitorMetadataService::class.java)

Expand Down Expand Up @@ -187,12 +190,12 @@ object MonitorMetadataService :

suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata {
try {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
} else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
else null
val runContext = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap<String, MutableMap<String, Any>>)
} else null
else null
return if (runContext != null) {
metadata.copy(
lastRunContext = runContext
Expand All @@ -210,12 +213,13 @@ object MonitorMetadataService :
createWithRunContext: Boolean,
workflowMetadataId: String? = null,
): MonitorMetadata {
val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR)
val monitorIndex = if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR)
(monitor.inputs[0] as DocLevelMonitorInput).indices[0]
else null
val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
val runContext =
if (MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR && createWithRunContext)
createFullRunContext(monitorIndex)
else emptyMap()
return MonitorMetadata(
id = MonitorMetadata.getId(monitor, workflowMetadataId),
seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
val monitor = job as Monitor
val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}"
logger.info(
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType.name}, periodStart: $periodStart, " +
"Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType}, periodStart: $periodStart, " +
"periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId"
)
val runResult = if (monitor.isBucketLevelMonitor()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.Locale

object QueryLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down Expand Up @@ -68,7 +69,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = when (monitor.monitorType) {
val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
Expand All @@ -80,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.")
}

triggerResults[trigger.id] = triggerResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import org.opensearch.rest.RestResponse
import org.opensearch.rest.action.RestResponseListener
import java.io.IOException
import java.time.Instant
import java.util.Locale

private val log = LogManager.getLogger(RestIndexMonitorAction::class.java)

Expand Down Expand Up @@ -100,8 +101,7 @@ class RestIndexMonitorAction : BaseRestHandler() {

val monitorType = monitor.monitorType
val triggers = monitor.triggers

when (monitorType) {
when (Monitor.MonitorType.valueOf(monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.Locale

private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand Down Expand Up @@ -83,7 +84,7 @@ class TransportExecuteMonitorAction @Inject constructor(
}
try {
log.info(
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " +
"Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType}, " +
"periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}"
)
val monitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun, transportService)
Expand Down Expand Up @@ -137,7 +138,7 @@ class TransportExecuteMonitorAction @Inject constructor(
false -> (execMonitorRequest.monitor as Monitor).copy(user = user)
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
try {
scope.launch {
if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelMonitorInput.Companion.DOC_LEVEL_INPUT_FIELD
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Monitor.MonitorType
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX
import org.opensearch.commons.alerting.model.SearchInput
Expand All @@ -81,6 +82,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.io.IOException
import java.time.Duration
import java.util.Locale

private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)
Expand Down Expand Up @@ -536,7 +538,7 @@ class TransportIndexMonitorAction @Inject constructor(
throw t
}
try {
if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) {
indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy)
}
// When inserting queries in queryIndex we could update sourceToQueryIndexMapping
Expand Down Expand Up @@ -700,7 +702,7 @@ class TransportIndexMonitorAction @Inject constructor(
val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor)
// Recreate runContext if metadata exists
// Delete and insert all queries from/to queryIndex
if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (!created && MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == MonitorType.DOC_LEVEL_MONITOR) {
updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor)
client.suspendUntil<Client, BulkByScrollResponse> {
DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ import org.opensearch.rest.RestRequest
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.util.Locale
import java.util.UUID
import java.util.stream.Collectors

Expand Down Expand Up @@ -399,7 +400,7 @@ class TransportIndexWorkflowAction @Inject constructor(
log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!")
}

if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
}
Expand Down Expand Up @@ -553,7 +554,10 @@ class TransportIndexWorkflowAction @Inject constructor(
workflowMetadataId = workflowMetadata.id
)

if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) {
if (
!created &&
Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR
) {
var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor)
val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor)
updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping)
Expand Down Expand Up @@ -631,7 +635,7 @@ class TransportIndexWorkflowAction @Inject constructor(
* Returns list of indices for the given monitor depending on it's type
*/
private fun getMonitorIndices(monitor: Monitor): List<String> {
return when (monitor.monitorType) {
return when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) {
Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices }
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import java.util.Locale

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down Expand Up @@ -74,9 +75,11 @@ fun Destination.isAllowed(allowList: List<String>): Boolean = allowList.contains

fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTION

fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR
fun Monitor.isDocLevelMonitor(): Boolean =
Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR

fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR
fun Monitor.isQueryLevelMonitor(): Boolean =
Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR

/**
* Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,7 +494,7 @@ fun randomADMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime,
user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand Down
14 changes: 7 additions & 7 deletions alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ fun randomQueryLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -113,7 +113,7 @@ fun randomQueryLevelMonitorWithoutUser(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -137,7 +137,7 @@ fun randomBucketLevelMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -162,7 +162,7 @@ fun randomBucketLevelMonitor(
dataSources: DataSources
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(),
dataSources = dataSources
Expand All @@ -181,7 +181,7 @@ fun randomClusterMetricsMonitor(
withMetadata: Boolean = false
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf()
)
Expand All @@ -200,7 +200,7 @@ fun randomDocumentLevelMonitor(
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), owner = owner
)
Expand All @@ -220,7 +220,7 @@ fun randomDocumentLevelMonitor(
owner: String? = null
): Monitor {
return Monitor(
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs,
name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs,
schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner
)
Expand Down

0 comments on commit 8713997

Please sign in to comment.