diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 1f8a4d514..a759b40e3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -51,6 +51,7 @@ import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction import org.opensearch.alerting.resthandler.RestSearchMonitorAction +import org.opensearch.alerting.resthandler.RestUpdateMonitorStateAction import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings @@ -83,6 +84,7 @@ import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction import org.opensearch.alerting.transport.TransportSearchEmailAccountAction import org.opensearch.alerting.transport.TransportSearchEmailGroupAction import org.opensearch.alerting.transport.TransportSearchMonitorAction +import org.opensearch.alerting.transport.TransportUpdateMonitorStateAction import org.opensearch.alerting.util.DocLevelMonitorQueries import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator import org.opensearch.client.Client @@ -217,6 +219,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestIndexAlertingCommentAction(), RestSearchAlertingCommentAction(), RestDeleteAlertingCommentAction(), + RestUpdateMonitorStateAction(), ) } @@ -248,7 +251,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java), ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java), - ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java) + ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java), + ActionPlugin.ActionHandler(AlertingActions.UPDATE_MONITOR_STATE_ACTION_TYPE, TransportUpdateMonitorStateAction::class.java) ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestUpdateMonitorStateAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestUpdateMonitorStateAction.kt new file mode 100644 index 000000000..4e8f188fc --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestUpdateMonitorStateAction.kt @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.client.node.NodeClient +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.action.AlertingActions.UPDATE_MONITOR_STATE_ACTION_TYPE +import org.opensearch.commons.alerting.action.UpdateMonitorStateRequest +import org.opensearch.commons.alerting.action.UpdateMonitorStateResponse +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.util.AlertingException +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.BytesRestResponse +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestHandler.ReplacedRoute +import org.opensearch.rest.RestHandler.Route +import org.opensearch.rest.RestRequest +import org.opensearch.rest.RestRequest.Method.PUT +import java.io.IOException + +private val log = LogManager.getLogger(RestUpdateMonitorStateAction::class.java) + +// Rest handler to enable and disable monitors. +class RestUpdateMonitorStateAction : BaseRestHandler() { + override fun getName(): String { + return "update_monitor_state_action" + } + + override fun routes(): List { + return listOf() + } + + override fun replacedRoutes(): MutableList { + return mutableListOf( + ReplacedRoute( + PUT, + "${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/enable", + PUT, + "${AlertingPlugin.LEGACY_OPENDISTRO_MONITOR_BASE_URI}/{monitorID}/enable" + ), + ReplacedRoute( + PUT, + "${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/disable", + PUT, + "${AlertingPlugin.LEGACY_OPENDISTRO_MONITOR_BASE_URI}/{monitorID}/disable" + ) + ) + } + + @Throws(IOException::class) + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + val monitorId = request.param("monitorID", Monitor.NO_ID) + if (request.method() == PUT && Monitor.NO_ID == monitorId) { + throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID")) + } + + // Check if the request is being made to enable the monitor + val enabled = request.path().endsWith("/enable") + + log.debug("{} {}/{}/{}", request.method(), AlertingPlugin.MONITOR_BASE_URI, monitorId, if (enabled) "enable" else "disable") + + return RestChannelConsumer { channel -> + val updateMonitorStateRequest = UpdateMonitorStateRequest( + monitorId = monitorId, + enabled = enabled, + seqNo = -1, // This will be handled in transport layer + primaryTerm = -1, // This will be handled in transport layer + method = request.method() + ) + + client.execute( + UPDATE_MONITOR_STATE_ACTION_TYPE, + updateMonitorStateRequest, + object : ActionListener { + override fun onResponse(response: UpdateMonitorStateResponse) { + channel.sendResponse( + BytesRestResponse( + RestStatus.OK, + response.toXContent( + XContentBuilder.builder( + XContentType.JSON.xContent() + ), + ToXContent.EMPTY_PARAMS + ) + ) + ) + } + override fun onFailure(e: Exception) { + channel.sendResponse(BytesRestResponse(channel, e)) + } + } + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportUpdateMonitorStateAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportUpdateMonitorStateAction.kt new file mode 100644 index 000000000..ea7da5117 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportUpdateMonitorStateAction.kt @@ -0,0 +1,131 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.WriteRequest +import org.opensearch.client.node.NodeClient +import org.opensearch.common.inject.Inject +import org.opensearch.commons.alerting.action.AlertingActions +import org.opensearch.commons.alerting.action.AlertingActions.GET_MONITOR_ACTION_TYPE +import org.opensearch.commons.alerting.action.AlertingActions.INDEX_MONITOR_ACTION_TYPE +import org.opensearch.commons.alerting.action.GetMonitorRequest +import org.opensearch.commons.alerting.action.GetMonitorResponse +import org.opensearch.commons.alerting.action.IndexMonitorRequest +import org.opensearch.commons.alerting.action.IndexMonitorResponse +import org.opensearch.commons.alerting.action.UpdateMonitorStateRequest +import org.opensearch.commons.alerting.action.UpdateMonitorStateResponse +import org.opensearch.core.action.ActionListener +import org.opensearch.core.common.io.stream.NamedWriteableRegistry +import org.opensearch.core.rest.RestStatus +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.time.Instant + +class TransportUpdateMonitorStateAction @Inject constructor( + transportService: TransportService, + val client: NodeClient, + val namedWriteableRegistry: NamedWriteableRegistry, + actionFilters: ActionFilters +) : HandledTransportAction( + AlertingActions.UPDATE_MONITOR_STATE_ACTION_NAME, + transportService, + actionFilters, + ::UpdateMonitorStateRequest +) { + private val log = LogManager.getLogger(javaClass) + + override fun doExecute( + task: Task, + request: UpdateMonitorStateRequest, + actionListener: ActionListener + ) { + val monitorId = request.monitorId + val enabled = request.enabled + + val getMonitorRequest = GetMonitorRequest( + monitorId = monitorId, + version = -3L, + method = request.method, + srcContext = null + ) + + client.execute( + GET_MONITOR_ACTION_TYPE, + getMonitorRequest, + object : ActionListener { + override fun onResponse(getMonitorResponse: GetMonitorResponse) { + try { + if (getMonitorResponse.monitor == null) { + actionListener.onFailure( + OpenSearchStatusException("Monitor $monitorId not found", RestStatus.NOT_FOUND) + ) + return + } + + if (getMonitorResponse.monitor!!.enabled == enabled) { + actionListener.onFailure( + OpenSearchStatusException( + "Monitor $monitorId is already ${if (enabled) "enabled" else "disabled"}", + RestStatus.BAD_REQUEST + ) + ) + return + } + + // Create updated monitor with new enabled state + val updatedMonitor = getMonitorResponse.monitor!!.copy( + enabled = enabled, + enabledTime = if (enabled) Instant.now() else null + ) + + // Create index monitor request + val indexMonitorRequest = IndexMonitorRequest( + monitorId = monitorId, + seqNo = getMonitorResponse.seqNo, + primaryTerm = getMonitorResponse.primaryTerm, + refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE, + method = request.method, + monitor = updatedMonitor, + ) + + // Execute index monitor request + client.execute( + INDEX_MONITOR_ACTION_TYPE, + indexMonitorRequest, + object : ActionListener { + override fun onResponse(indexMonitorResponse: IndexMonitorResponse) { + actionListener.onResponse( + UpdateMonitorStateResponse( + id = monitorId, + version = indexMonitorResponse.version, + seqNo = indexMonitorResponse.seqNo, + primaryTerm = indexMonitorResponse.primaryTerm, + monitor = updatedMonitor + ) + ) + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(e) + } + } + ) + } catch (e: Exception) { + actionListener.onFailure(e) + } + } + + override fun onFailure(e: Exception) { + actionListener.onFailure(e) + } + } + ) + } +}