Skip to content

Commit

Permalink
Added input validation, and fixed bug for cross cluster monitors. (#1510
Browse files Browse the repository at this point in the history
)

* Fixed issue where InputService wouldn't wait for cluster metrics monitor to finish executing against all clusters.

Signed-off-by: AWSHurneyt <[email protected]>

* Added input validation for GetRemoteIndexes API, and added related unit and integration tests.

Signed-off-by: AWSHurneyt <[email protected]>

* Removed unused variable, and imports.

Signed-off-by: AWSHurneyt <[email protected]>

* Made initial call to GetRemoteIndexes API log INFO level to capture timestamp of incoming request.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed comment.

Signed-off-by: AWSHurneyt <[email protected]>

* Moved some regex to common utils.

Signed-off-by: AWSHurneyt <[email protected]>

* Renamed cross cluster monitor setting.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed import.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed import.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed ktlint error.

Signed-off-by: AWSHurneyt <[email protected]>

* Added null checks for health statuses.

Signed-off-by: AWSHurneyt <[email protected]>

* Wrapped Monitor.parse calls in AlertingExceptions so IllegalArgumentExceptions are wrapped in 4xx-level exceptions.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed merge error.

Signed-off-by: AWSHurneyt <[email protected]>

* Fixed test.

Signed-off-by: AWSHurneyt <[email protected]>

---------

Signed-off-by: AWSHurneyt <[email protected]>
  • Loading branch information
AWSHurneyt committed Apr 13, 2024
1 parent 12a39ba commit 3fe6cfa
Show file tree
Hide file tree
Showing 17 changed files with 606 additions and 91 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.REMOTE_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
)
}

Expand Down
72 changes: 37 additions & 35 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -48,8 +47,6 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -99,36 +96,7 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while (
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
(responseMap.size < input.clusters.size)
) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
results += handleClusterMetricsInput(input)
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down Expand Up @@ -286,4 +254,38 @@ class InputService(

return searchRequest
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val results = mutableListOf<Map<String, Any>>()
val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
// If remote monitoring is enabled, and the monitor is configured to execute against remote clusters,
// execute the API against each cluster, and compile the results.
client.threadPool().threadContext.stashContext().use {
val singleThreadContext = newSingleThreadContext("ClusterMetricsMonitorThread")
withContext(singleThreadContext) {
it.restore()
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
results += responseMap
}
}
} else {
// Else only execute the API against the local cluster.
val response = executeTransportAction(input, client)
results += response.toMap()
}
return results
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX
import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException
Expand Down Expand Up @@ -36,7 +38,38 @@ class GetRemoteIndexesRequest : ActionRequest {
out.writeBoolean(includeMappings)
}

/**
* Validates the request [indexes].
* @return TRUE if all entries are valid; else FALSE.
*/
fun isValid(): Boolean {
return indexes.isNotEmpty() && indexes.all { validPattern(it) }
}

/**
* Validates individual entries in the request [indexes].
*
* @param pattern The entry to evaluate. The expected patterns are `<index-pattern>` for a local index, and
* `<cluster-pattern>:<index-pattern>` for remote indexes. These patterns are consistent with the `GET _resolve/index` API.
* @return TRUE if the entry is valid; else FALSE.
*/
private fun validPattern(pattern: String): Boolean {
// In some situations, `<cluster-pattern>` could contain a `:` character.
// Identifying the `<index-pattern>` based on the last occurrence of `:` in the pattern.
val separatorIndex = pattern.lastIndexOf(":")
return if (separatorIndex == -1) {
// Indicates a local index pattern.
INDEX_PATTERN_REGEX.matches(pattern)
} else {
// Indicates a remote index pattern.
val clusterPattern = pattern.substring(0, separatorIndex)
val indexPattern = pattern.substring(separatorIndex + 1)
CLUSTER_PATTERN_REGEX.matches(clusterPattern) && INDEX_PATTERN_REGEX.matches(indexPattern)
}
}

companion object {
const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern."
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

data class ClusterIndexes(
val clusterName: String,
val clusterHealth: ClusterHealthStatus,
val clusterHealth: ClusterHealthStatus?,
val hubCluster: Boolean,
val indexes: List<ClusterIndex> = listOf(),
val latency: Long
Expand All @@ -51,7 +51,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
clusterName = sin.readString(),
clusterHealth = sin.readEnum(ClusterHealthStatus::class.java),
clusterHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom),
hubCluster = sin.readBoolean(),
indexes = sin.readList((ClusterIndex.Companion)::readFrom),
latency = sin.readLong()
Expand All @@ -72,7 +72,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

override fun writeTo(out: StreamOutput) {
out.writeString(clusterName)
out.writeEnum(clusterHealth)
if (clusterHealth != null) out.writeEnum(clusterHealth)
indexes.forEach { it.writeTo(out) }
out.writeLong(latency)
}
Expand Down Expand Up @@ -100,7 +100,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {
@Throws(IOException::class)
constructor(sin: StreamInput) : this(
indexName = sin.readString(),
indexHealth = sin.readEnum(ClusterHealthStatus::class.java),
indexHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom),
mappings = sin.readOptionalWriteable(::MappingMetadata)
)

Expand All @@ -115,7 +115,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

override fun writeTo(out: StreamOutput) {
out.writeString(indexName)
out.writeEnum(indexHealth)
if (indexHealth != null) out.writeEnum(indexHealth)
if (mappings != null) out.writeMap(mappings.sourceAsMap)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteMonitorRequest
import org.opensearch.alerting.util.AlertingException
import org.opensearch.client.node.NodeClient
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -64,7 +65,14 @@ class RestExecuteMonitorAction : BaseRestHandler() {
} else {
val xcp = request.contentParser()
ensureExpectedToken(START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION)

val monitor: Monitor
try {
monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION)
} catch (e: Exception) {
throw AlertingException.wrap(e)
}

val execMonitorRequest = ExecuteMonitorRequest(dryrun, requestEnd, null, monitor)
client.execute(ExecuteMonitorAction.INSTANCE, execMonitorRequest, RestToXContentListener(channel))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import org.opensearch.rest.action.RestToXContentListener
private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java)

class RestGetRemoteIndexesAction : BaseRestHandler() {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
companion object {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
}

override fun getName(): String {
return "get_remote_indexes_action"
Expand All @@ -32,7 +34,7 @@ class RestGetRemoteIndexesAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} $ROUTE")
log.info("${request.method()} $ROUTE")
val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, ""))
val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false)
return RestChannelConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.AlertingPlugin
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IF_PRIMARY_TERM
import org.opensearch.alerting.util.IF_SEQ_NO
import org.opensearch.alerting.util.REFRESH
Expand Down Expand Up @@ -80,49 +81,59 @@ class RestIndexMonitorAction : BaseRestHandler() {

val id = request.param("monitorID", Monitor.NO_ID)
if (request.method() == PUT && Monitor.NO_ID == id) {
throw IllegalArgumentException("Missing monitor ID")
throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID"))
}

// Validate request by parsing JSON to Monitor
val xcp = request.contentParser()
ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp)
val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())
val rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

validateDataSources(monitor)
validateOwner(monitor.owner)
val monitorType = monitor.monitorType
val triggers = monitor.triggers
when (monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor")

val monitor: Monitor
val rbacRoles: List<String>?
try {
monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now())

rbacRoles = request.contentParser().map()["rbac_roles"] as List<String>?

validateDataSources(monitor)
validateOwner(monitor.owner)

val monitorType = monitor.monitorType
val triggers = monitor.triggers

when (monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw (IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor"))
}
}
}
}
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is BucketLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor")
Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is BucketLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor")
}
}
}
}
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor")
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
triggers.forEach {
if (it !is QueryLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor")
}
}
}
}
Monitor.MonitorType.DOC_LEVEL_MONITOR -> {
triggers.forEach {
if (it !is DocumentLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor")
Monitor.MonitorType.DOC_LEVEL_MONITOR -> {
validateDocLevelQueryName(monitor)
triggers.forEach {
if (it !is DocumentLevelTrigger) {
throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor")
}
}
}
validateDocLevelQueryName(monitor)
}
} catch (e: Exception) {
throw AlertingException.wrap(e)
}

val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val REMOTE_MONITORING_ENABLED = Setting.boolSetting(
"plugins.alerting.remote_monitoring_enabled",
val CROSS_CLUSTER_MONITORING_ENABLED = Setting.boolSetting(
"plugins.alerting.cross_cluster_monitoring_enabled",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
Expand Down
Loading

0 comments on commit 3fe6cfa

Please sign in to comment.