Skip to content

Commit

Permalink
feat: add an api endpoint to toggle the state of a monitor
Browse files Browse the repository at this point in the history
Signed-off-by: vikhy-aws <[email protected]>
  • Loading branch information
vikhy-aws committed Jan 16, 2025
1 parent 2e1cc91 commit 58de5a9
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import org.opensearch.alerting.resthandler.RestSearchAlertingCommentAction
import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.resthandler.RestUpdateMonitorStateAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
Expand Down Expand Up @@ -83,6 +84,7 @@ import org.opensearch.alerting.transport.TransportSearchAlertingCommentAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.transport.TransportUpdateMonitorStateAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.client.Client
Expand Down Expand Up @@ -217,6 +219,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestIndexAlertingCommentAction(),
RestSearchAlertingCommentAction(),
RestDeleteAlertingCommentAction(),
RestUpdateMonitorStateAction(),
)
}

Expand Down Expand Up @@ -248,7 +251,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.DELETE_COMMENT_ACTION_TYPE, TransportDeleteAlertingCommentAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.UPDATE_MONITOR_STATE_ACTION_TYPE, TransportUpdateMonitorStateAction::class.java)
)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.resthandler

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.client.node.NodeClient
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.action.AlertingActions.UPDATE_MONITOR_STATE_ACTION_TYPE
import org.opensearch.commons.alerting.action.UpdateMonitorStateRequest
import org.opensearch.commons.alerting.action.UpdateMonitorStateResponse
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.util.AlertingException
import org.opensearch.core.action.ActionListener
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.rest.BaseRestHandler
import org.opensearch.rest.BytesRestResponse
import org.opensearch.rest.RestHandler
import org.opensearch.rest.RestHandler.ReplacedRoute
import org.opensearch.rest.RestHandler.Route
import org.opensearch.rest.RestRequest
import org.opensearch.rest.RestRequest.Method.PUT
import java.io.IOException

private val log = LogManager.getLogger(RestUpdateMonitorStateAction::class.java)

// Rest handler to enable and disable monitors.
class RestUpdateMonitorStateAction : BaseRestHandler() {
override fun getName(): String {
return "update_monitor_state_action"
}

override fun routes(): List<Route> {
return listOf()
}

override fun replacedRoutes(): MutableList<RestHandler.ReplacedRoute> {
return mutableListOf(
ReplacedRoute(
PUT,
"${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/enable",
PUT,
"${AlertingPlugin.LEGACY_OPENDISTRO_MONITOR_BASE_URI}/{monitorID}/enable"
),
ReplacedRoute(
PUT,
"${AlertingPlugin.MONITOR_BASE_URI}/{monitorID}/disable",
PUT,
"${AlertingPlugin.LEGACY_OPENDISTRO_MONITOR_BASE_URI}/{monitorID}/disable"
)
)
}

@Throws(IOException::class)
override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
val monitorId = request.param("monitorID", Monitor.NO_ID)
if (request.method() == PUT && Monitor.NO_ID == monitorId) {
throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID"))
}

// Check if the request is being made to enable the monitor
val enabled = request.path().endsWith("/enable")

log.debug("{} {}/{}/{}", request.method(), AlertingPlugin.MONITOR_BASE_URI, monitorId, if (enabled) "enable" else "disable")

return RestChannelConsumer { channel ->
val updateMonitorStateRequest = UpdateMonitorStateRequest(
monitorId = monitorId,
enabled = enabled,
seqNo = -1, // This will be handled in transport layer
primaryTerm = -1, // This will be handled in transport layer
method = request.method()
)

client.execute(
UPDATE_MONITOR_STATE_ACTION_TYPE,
updateMonitorStateRequest,
object : ActionListener<UpdateMonitorStateResponse> {
override fun onResponse(response: UpdateMonitorStateResponse) {
channel.sendResponse(
BytesRestResponse(
RestStatus.OK,
response.toXContent(
XContentBuilder.builder(
XContentType.JSON.xContent()
),
ToXContent.EMPTY_PARAMS
)
)
)
}
override fun onFailure(e: Exception) {
channel.sendResponse(BytesRestResponse(channel, e))
}
}
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest
import org.opensearch.client.node.NodeClient
import org.opensearch.common.inject.Inject
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.AlertingActions.GET_MONITOR_ACTION_TYPE
import org.opensearch.commons.alerting.action.AlertingActions.INDEX_MONITOR_ACTION_TYPE
import org.opensearch.commons.alerting.action.GetMonitorRequest
import org.opensearch.commons.alerting.action.GetMonitorResponse
import org.opensearch.commons.alerting.action.IndexMonitorRequest
import org.opensearch.commons.alerting.action.IndexMonitorResponse
import org.opensearch.commons.alerting.action.UpdateMonitorStateRequest
import org.opensearch.commons.alerting.action.UpdateMonitorStateResponse
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.rest.RestStatus
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService
import java.time.Instant

class TransportUpdateMonitorStateAction @Inject constructor(
transportService: TransportService,
val client: NodeClient,
val namedWriteableRegistry: NamedWriteableRegistry,
actionFilters: ActionFilters
) : HandledTransportAction<UpdateMonitorStateRequest, UpdateMonitorStateResponse>(
AlertingActions.UPDATE_MONITOR_STATE_ACTION_NAME,
transportService,
actionFilters,
::UpdateMonitorStateRequest
) {
private val log = LogManager.getLogger(javaClass)

override fun doExecute(
task: Task,
request: UpdateMonitorStateRequest,
actionListener: ActionListener<UpdateMonitorStateResponse>
) {
val monitorId = request.monitorId
val enabled = request.enabled

val getMonitorRequest = GetMonitorRequest(
monitorId = monitorId,
version = -3L,
method = request.method,
srcContext = null
)

client.execute(
GET_MONITOR_ACTION_TYPE,
getMonitorRequest,
object : ActionListener<GetMonitorResponse> {
override fun onResponse(getMonitorResponse: GetMonitorResponse) {
try {
if (getMonitorResponse.monitor == null) {
actionListener.onFailure(
OpenSearchStatusException("Monitor $monitorId not found", RestStatus.NOT_FOUND)
)
return
}

if (getMonitorResponse.monitor!!.enabled == enabled) {
actionListener.onFailure(
OpenSearchStatusException(
"Monitor $monitorId is already ${if (enabled) "enabled" else "disabled"}",
RestStatus.BAD_REQUEST
)
)
return
}

// Create updated monitor with new enabled state
val updatedMonitor = getMonitorResponse.monitor!!.copy(
enabled = enabled,
enabledTime = if (enabled) Instant.now() else null
)

// Create index monitor request
val indexMonitorRequest = IndexMonitorRequest(
monitorId = monitorId,
seqNo = getMonitorResponse.seqNo,
primaryTerm = getMonitorResponse.primaryTerm,
refreshPolicy = WriteRequest.RefreshPolicy.IMMEDIATE,
method = request.method,
monitor = updatedMonitor,
)

// Execute index monitor request
client.execute(
INDEX_MONITOR_ACTION_TYPE,
indexMonitorRequest,
object : ActionListener<IndexMonitorResponse> {
override fun onResponse(indexMonitorResponse: IndexMonitorResponse) {
actionListener.onResponse(
UpdateMonitorStateResponse(
id = monitorId,
version = indexMonitorResponse.version,
seqNo = indexMonitorResponse.seqNo,
primaryTerm = indexMonitorResponse.primaryTerm,
monitor = updatedMonitor
)
)
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
}
)
} catch (e: Exception) {
actionListener.onFailure(e)
}
}

override fun onFailure(e: Exception) {
actionListener.onFailure(e)
}
}
)
}
}

0 comments on commit 58de5a9

Please sign in to comment.