Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added input validation, and fixed bug for cross cluster monitors. #1510

Merged
merged 15 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.REMOTE_MONITORING_ENABLED
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED
)
}

Expand Down
72 changes: 37 additions & 35 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -49,8 +48,6 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -100,36 +97,7 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while (
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
(responseMap.size < input.clusters.size)
) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
results += handleClusterMetricsInput(input)
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down Expand Up @@ -287,4 +255,38 @@ class InputService(

return searchRequest
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val results = mutableListOf<Map<String, Any>>()
val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
// If remote monitoring is enabled, and the monitor is configured to execute against remote clusters,
// execute the API against each cluster, and compile the results.
client.threadPool().threadContext.stashContext().use {
val singleThreadContext = newSingleThreadContext("ClusterMetricsMonitorThread")
withContext(singleThreadContext) {
it.restore()
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
results += responseMap
}
}
} else {
// Else only execute the API against the local cluster.
val response = executeTransportAction(input, client)
results += response.toMap()
}
return results
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ object QueryLevelMonitorRunner : MonitorRunner() {
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX
import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException
Expand Down Expand Up @@ -36,7 +38,38 @@ class GetRemoteIndexesRequest : ActionRequest {
out.writeBoolean(includeMappings)
}

/**
* Validates the request [indexes].
* @return TRUE if all entries are valid; else FALSE.
*/
fun isValid(): Boolean {
return indexes.isNotEmpty() && indexes.all { validPattern(it) }
}

/**
* Validates individual entries in the request [indexes].
*
* @param pattern The entry to evaluate. The expected patterns are `<index-pattern>` for a local index, and
* `<cluster-pattern>:<index-pattern>` for remote indexes. These patterns are consistent with the `GET _resolve/index` API.
* @return TRUE if the entry is valid; else FALSE.
*/
private fun validPattern(pattern: String): Boolean {
// In some situations, `<cluster-pattern>` could contain a `:` character.
// Identifying the `<index-pattern>` based on the last occurrence of `:` in the pattern.
val separatorIndex = pattern.lastIndexOf(":")
return if (separatorIndex == -1) {
// Indicates a local index pattern.
INDEX_PATTERN_REGEX.matches(pattern)
} else {
// Indicates a remote index pattern.
val clusterPattern = pattern.substring(0, separatorIndex)
val indexPattern = pattern.substring(separatorIndex + 1)
CLUSTER_PATTERN_REGEX.matches(clusterPattern) && INDEX_PATTERN_REGEX.matches(indexPattern)
}
}

companion object {
const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern."
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

data class ClusterIndexes(
val clusterName: String,
val clusterHealth: ClusterHealthStatus,
val clusterHealth: ClusterHealthStatus?,
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
val hubCluster: Boolean,
val indexes: List<ClusterIndex> = listOf(),
val latency: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import org.opensearch.rest.action.RestToXContentListener
private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java)

class RestGetRemoteIndexesAction : BaseRestHandler() {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
companion object {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
}

override fun getName(): String {
return "get_remote_indexes_action"
Expand All @@ -32,7 +34,7 @@ class RestGetRemoteIndexesAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} $ROUTE")
log.info("${request.method()} $ROUTE")
val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, ""))
val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false)
return RestChannelConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,8 +202,8 @@ class AlertingSettings {
Setting.Property.Dynamic
)

val REMOTE_MONITORING_ENABLED = Setting.boolSetting(
"plugins.alerting.remote_monitoring_enabled",
val CROSS_CLUSTER_MONITORING_ENABLED = Setting.boolSetting(
"plugins.alerting.cross_cluster_monitoring_enabled",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes
import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.CROSS_CLUSTER_MONITORING_ENABLED
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.client.Client
Expand Down Expand Up @@ -62,10 +62,10 @@ class TransportGetRemoteIndexesAction @Inject constructor(

@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

@Volatile private var remoteMonitoringEnabled = REMOTE_MONITORING_ENABLED.get(settings)
@Volatile private var remoteMonitoringEnabled = CROSS_CLUSTER_MONITORING_ENABLED.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(REMOTE_MONITORING_ENABLED) { remoteMonitoringEnabled = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(CROSS_CLUSTER_MONITORING_ENABLED) { remoteMonitoringEnabled = it }
listenFilterBySettingChange(clusterService)
}

Expand All @@ -87,6 +87,15 @@ class TransportGetRemoteIndexesAction @Inject constructor(
val user = readUserFromThreadContext(client)
if (!validateUserBackendRoles(user, actionListener)) return

if (!request.isValid()) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(GetRemoteIndexesRequest.INVALID_PATTERN_MESSAGE, RestStatus.BAD_REQUEST)
)
)
return
}

client.threadPool().threadContext.stashContext().use {
scope.launch {
val singleThreadContext = newSingleThreadContext("GetRemoteIndexesActionThread")
Expand All @@ -96,8 +105,7 @@ class TransportGetRemoteIndexesAction @Inject constructor(

var resolveIndexResponse: ResolveIndexAction.Response? = null
try {
resolveIndexResponse =
getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService))
resolveIndexResponse = getRemoteClusters(request.indexes)
} catch (e: Exception) {
log.error("Failed to retrieve indexes for request $request", e)
actionListener.onFailure(AlertingException.wrap(e))
Expand Down Expand Up @@ -151,7 +159,7 @@ class TransportGetRemoteIndexesAction @Inject constructor(
clusterIndexesList.add(
ClusterIndexes(
clusterName = clusterName,
clusterHealth = clusterHealthResponse!!.status,
clusterHealth = clusterHealthResponse?.status,
hubCluster = clusterName == clusterService.clusterName.value(),
indexes = clusterIndexList,
latency = latency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import org.opensearch.core.xcontent.XContentParser
class IndexUtils {

companion object {
val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""")

const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import org.opensearch.action.admin.indices.stats.CommonStats
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.support.IndicesOptions
import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.common.time.DateFormatter
import org.opensearch.commons.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ import org.opensearch.action.admin.indices.stats.CommonStats
import org.opensearch.action.admin.indices.stats.IndicesStatsRequest
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse
import org.opensearch.action.admin.indices.stats.ShardStats
import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX
import org.opensearch.cluster.routing.UnassignedInfo
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.StreamOutput
import org.opensearch.core.common.io.stream.Writeable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,16 +966,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
num = randomIntBetween(1, 10),
includeWriteIndex = true
),
createIndices: Boolean = true
): MutableMap<String, MutableMap<String, Boolean>> {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
val indexName = createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(indexName, false)
indicesMap[indexName] = isWriteIndex
if (createIndices) createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(it, false)
indicesMap[it] = isWriteIndex
val indexMap = mapOf(
"add" to mapOf(
"index" to indexName,
"index" to it,
"alias" to alias,
"is_write_index" to isWriteIndex
)
Expand Down Expand Up @@ -1297,6 +1298,15 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response.asMap()
}

fun RestClient.getSettings(): Map<String, Any> {
val response = this.makeRequest(
"GET",
"_cluster/settings?flat_settings=true"
)
assertEquals(RestStatus.OK, response.restStatus())
return response.asMap()
}

fun RestClient.updateSettings(setting: String, value: Any): Map<String, Any> {
val settings = jsonBuilder()
.startObject()
Expand Down
Loading
Loading