Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API to enable/disable a monitor #1771

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.resthandler.RestToggleMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
Expand Down Expand Up @@ -83,6 +84,7 @@ import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.transport.TransportToggleMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.client.Client
Expand Down Expand Up @@ -217,6 +219,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
RestToggleMonitorAction(),
)
}

Expand Down Expand Up @@ -248,7 +251,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.TOGGLE_MONITOR_ACTION_TYPE, TransportToggleMonitorAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AlertingActions.TOGGLE_MONITOR_ACTION_TYPE
import org.opensearch.commons.alerting.action.ToggleMonitorRequest
import org.opensearch.commons.alerting.action.ToggleMonitorResponse
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestHandler.ReplacedRoute
import org.opensearch.rest.RestHandler.Route
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestRequest.Method.PUT
import java.io.IOException

private val log = LogManager.getLogger(RestToggleMonitorAction::class.java)

class RestToggleMonitorAction : BaseRestHandler() {
override fun getName(): String {
return "toggle_monitor_action"
}

override fun routes(): List<Route> {
return listOf()
}

override fun replacedRoutes(): MutableList<RestHandler.ReplacedRoute> {
return mutableListOf(
ReplacedRoute(
PUT,
"${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/enable",
PUT,
"${AlertingPlugin.LEGACY_OPENDISTRO_MONITOR_BASE_URI}/{monitorID}/enable"
),
ReplacedRoute(
PUT,
"${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/disable",
PUT,
"${AlertingPlugin.LEGACY_OPENDISTRO_MONITOR_BASE_URI}/{monitorID}/disable"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val monitorId = request.param("monitorID", Monitor.NO_ID)
if (request.method() == PUT && Monitor.NO_ID == monitorId) {
throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID"))
}

// Check if the request is being made to enable the monitor
val enabled = request.path().endsWith("/enable")

log.debug("{} {}/{}/{}", request.method(), AlertingPlugin.MONITOR_BASE_URI, monitorId, if (enabled) "enable" else "disable")

return RestChannelConsumer { channel ->
val toggleMonitorRequest = ToggleMonitorRequest(
monitorId = monitorId,
enabled = enabled,
seqNo = -1, // Updated in the transport layer
primaryTerm = -1, // Updated in the transport layer
method = request.method()
)

client.execute(
TOGGLE_MONITOR_ACTION_TYPE,
toggleMonitorRequest,
object : ActionListener<ToggleMonitorResponse> {
override fun onResponse(response: ToggleMonitorResponse) {
channel.sendResponse(
BytesRestResponse(
RestStatus.OK,
response.toXContent(
XContentBuilder.builder(
XContentType.JSON.xContent()
),
ToXContent.EMPTY_PARAMS
)
)
)
}
override fun onFailure(e: Exception) {
channel.sendResponse(BytesRestResponse(channel, e))
}
}
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.transport

import org.opensearch.OpenSearchStatusException
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.common.inject.Inject
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.AlertingActions.GET_MONITOR_ACTION_TYPE
import org.opensearch.commons.alerting.action.AlertingActions.INDEX_MONITOR_ACTION_TYPE
import org.opensearch.commons.alerting.action.GetMonitorRequest
import org.opensearch.commons.alerting.action.GetMonitorResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.ToggleMonitorRequest
import org.opensearch.commons.alerting.action.ToggleMonitorResponse
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant

class TransportToggleMonitorAction @Inject constructor(
transportService: TransportService,
val client: NodeClient,
val namedWriteableRegistry: NamedWriteableRegistry,
actionFilters: ActionFilters
) : HandledTransportAction<ToggleMonitorRequest, ToggleMonitorResponse>(
AlertingActions.TOGGLE_MONITOR_ACTION_NAME,
transportService,
actionFilters,
::ToggleMonitorRequest
) {

override fun doExecute(
task: Task,
request: ToggleMonitorRequest,
actionListener: ActionListener<ToggleMonitorResponse>
) {
val monitorId = request.monitorId
val enabled = request.enabled

val getMonitorRequest = GetMonitorRequest(
monitorId = monitorId,
version = -3L,
method = request.method,
srcContext = null
)

client.execute(
GET_MONITOR_ACTION_TYPE,
getMonitorRequest,
object : ActionListener<GetMonitorResponse> {
override fun onResponse(getMonitorResponse: GetMonitorResponse) {
try {
if (getMonitorResponse.monitor == null) {
actionListener.onFailure(
OpenSearchStatusException("Monitor $monitorId not found", RestStatus.NOT_FOUND)
)
return
}

if (getMonitorResponse.monitor!!.enabled == enabled) {
actionListener.onFailure(
OpenSearchStatusException(
"Monitor $monitorId is already ${if (enabled) "enabled" else "disabled"}",
RestStatus.BAD_REQUEST
)
)
return
}

// Create a copy of the monitor with the updated state
val updatedMonitor = getMonitorResponse.monitor!!.copy(
enabled = enabled,
enabledTime = if (enabled) Instant.now() else null
)

// Call indexMonitor API to update the monitor
val indexMonitorRequest = IndexMonitorRequest(
monitorId = monitorId,
seqNo = getMonitorResponse.seqNo,
primaryTerm = getMonitorResponse.primaryTerm,
refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE,
method = request.method,
monitor = updatedMonitor,
)

client.execute(
INDEX_MONITOR_ACTION_TYPE,
indexMonitorRequest,
object : ActionListener<IndexMonitorResponse> {
override fun onResponse(indexMonitorResponse: IndexMonitorResponse) {
actionListener.onResponse(
ToggleMonitorResponse(
id = monitorId,
version = indexMonitorResponse.version,
seqNo = indexMonitorResponse.seqNo,
primaryTerm = indexMonitorResponse.primaryTerm,
monitor = updatedMonitor
)
)
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
}
)
} catch (e: Exception) {
actionListener.onFailure(e)
}
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.apache.hc.core5.http.message.BasicHeader
import org.opensearch.alerting.ALERTING_BASE_URI
import org.opensearch.alerting.ALWAYS_RUN
import org.opensearch.alerting.ANOMALY_DETECTOR_INDEX
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.AlertingRestTestCase
import org.opensearch.alerting.LEGACY_OPENDISTRO_ALERTING_BASE_URI
import org.opensearch.alerting.alerts.AlertIndices
Expand Down Expand Up @@ -1332,6 +1333,94 @@ class MonitorRestApiIT : AlertingRestTestCase() {
}
}

fun `test enable monitor`() {
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = false, enabledTime = null), refresh = true).id
val response = client().makeRequest(
"PUT",
"${AlertingPlugin.MONITOR_BASE_URI}/$monitorId/enable",
emptyMap(),
null
)
val jsonResponse = createParser(XContentType.JSON.xContent(), response.entity.content).map()
val monitor = jsonResponse["Monitor"] as Map<*, *>
val enabled = monitor["enabled"]
// Monitor should be enabled
assertEquals(true, enabled)
}

fun `test disable monitor`() {
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true, enabledTime = Instant.now()), refresh = true).id
val response = client().makeRequest(
"PUT",
"${AlertingPlugin.MONITOR_BASE_URI}/$monitorId/disable",
emptyMap(),
null
)
val jsonResponse = createParser(XContentType.JSON.xContent(), response.entity.content).map()
val monitor = jsonResponse["Monitor"] as Map<*, *>
val enabled = monitor["enabled"]
val enabledTime = monitor["enabledTime"]
// Monitor should be disabled and the enabledTime should be null
assertEquals(false, enabled)
assertNull(enabledTime)
}

fun `test enable monitor when already enabled`() {
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true, enabledTime = Instant.now()), refresh = true).id
val exception = assertThrows(ResponseException::class.java) {
client().makeRequest(
"PUT",
"${AlertingPlugin.MONITOR_BASE_URI}/$monitorId/enable",
emptyMap(),
null
)
}
val errorResponse = createParser(XContentType.JSON.xContent(), exception.response.entity.content).map()
// Expected error
val expectedError = mapOf(
"error" to mapOf(
"root_cause" to listOf(
mapOf(
"type" to "status_exception",
"reason" to "Monitor $monitorId is already enabled"
)
),
"type" to "status_exception",
"reason" to "Monitor $monitorId is already enabled"
),
"status" to 400
)
assertEquals(expectedError, errorResponse)
}

fun `test disable monitor when already disabled`() {
val monitorId = createMonitor(randomQueryLevelMonitor(enabled = false, enabledTime = null), refresh = true).id
val exception = assertThrows(ResponseException::class.java) {
client().makeRequest(
"PUT",
"${AlertingPlugin.MONITOR_BASE_URI}/$monitorId/disable",
emptyMap(),
null
)
}
val errorResponse = createParser(XContentType.JSON.xContent(), exception.response.entity.content).map()
// Expected error
val expectedError = mapOf(
"error" to mapOf(
"root_cause" to listOf(
mapOf(
"type" to "status_exception",
"reason" to "Monitor $monitorId is already disabled"
)
),
"type" to "status_exception",
"reason" to "Monitor $monitorId is already disabled"
),
"status" to 400
)
assertEquals(expectedError, errorResponse)
}

/**
* This use case is needed by the frontend plugin for displaying alert counts on the Monitors list page.
* https://github.com/opensearch-project/alerting-dashboards-plugin/blob/main/server/services/MonitorService.js#L235
Expand Down