Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added input validation, and fixed bug for cross cluster monitors. #1510

Merged
merged 15 commits into from
Apr 13, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 37 additions & 35 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -49,8 +48,6 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -100,36 +97,7 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while (
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
(responseMap.size < input.clusters.size)
) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
results += handleClusterMetricsInput(input)
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down Expand Up @@ -287,4 +255,38 @@ class InputService(

return searchRequest
}

private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList<Map<String, Any>> {
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val results = mutableListOf<Map<String, Any>>()
val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
// If remote monitoring is enabled, and the monitor is configured to execute against remote clusters,
// execute the API against each cluster, and compile the results.
client.threadPool().threadContext.stashContext().use {
val singleThreadContext = newSingleThreadContext("ClusterMetricsMonitorThread")
withContext(singleThreadContext) {
it.restore()
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
results += responseMap
}
}
} else {
// Else only execute the API against the local cluster.
val response = executeTransportAction(input, client)
results += response.toMap()
}
return results
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException
Expand Down Expand Up @@ -36,7 +37,46 @@ class GetRemoteIndexesRequest : ActionRequest {
out.writeBoolean(includeMappings)
}

/**
* Validates the request [indexes].
* @return TRUE if all entries are valid; else FALSE.
*/
fun isValid(): Boolean {
return indexes.isNotEmpty() && indexes.all { validPattern(it) }
}

/**
* Validates individual entries in the request [indexes].
*
* @param pattern The entry to evaluate. The expected patterns are `<index-pattern>` for a local index, and
* `<cluster-pattern>:<index-pattern>` for remote indexes. These patterns are consistent with the `GET _resolve/index` API.
* @return TRUE if the entry is valid; else FALSE.
*/
private fun validPattern(pattern: String): Boolean {
// In some situations, `<cluster-pattern>` could contain a `:` character.
// Identifying the `<index-pattern>` based on the last occurrence of `:` in the pattern.
val separatorIndex = pattern.lastIndexOf(":")
return if (separatorIndex == -1) {
// Indicates a local index pattern.
INDEX_PATTERN_REGEX.matches(pattern)
} else {
// Indicates a remote index pattern.
val clusterPattern = pattern.substring(0, separatorIndex)
val indexPattern = pattern.substring(separatorIndex + 1)
CLUSTER_PATTERN_REGEX.matches(clusterPattern) && INDEX_PATTERN_REGEX.matches(indexPattern)
}
}

companion object {
/**
* This regex asserts that the string:
* Starts with a lowercase letter, digit, or asterisk
* Contains a sequence of characters followed by an optional colon and another sequence of characters
* The sequences of characters can include lowercase letters, uppercase letters, digits, underscores, asterisks, or hyphens
* The total length of the string can range from 1 to 255 characters
*/
val CLUSTER_PATTERN_REGEX = Regex("^(?=.{1,255}$)[a-z0-9*]([a-zA-Z0-9_*-]*:?[a-zA-Z0-9_*-]*)$")
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern."
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject {

data class ClusterIndexes(
val clusterName: String,
val clusterHealth: ClusterHealthStatus,
val clusterHealth: ClusterHealthStatus?,
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
val hubCluster: Boolean,
val indexes: List<ClusterIndex> = listOf(),
val latency: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ import org.opensearch.rest.action.RestToXContentListener
private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java)

class RestGetRemoteIndexesAction : BaseRestHandler() {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
companion object {
val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes"
}

override fun getName(): String {
return "get_remote_indexes_action"
Expand All @@ -32,7 +34,7 @@ class RestGetRemoteIndexesAction : BaseRestHandler() {
}

override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer {
log.debug("${request.method()} $ROUTE")
log.info("${request.method()} $ROUTE")
val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, ""))
val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false)
return RestChannelConsumer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,15 @@ class TransportGetRemoteIndexesAction @Inject constructor(
val user = readUserFromThreadContext(client)
if (!validateUserBackendRoles(user, actionListener)) return

if (!request.isValid()) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException(GetRemoteIndexesRequest.INVALID_PATTERN_MESSAGE, RestStatus.BAD_REQUEST)
)
)
return
}

client.threadPool().threadContext.stashContext().use {
scope.launch {
val singleThreadContext = newSingleThreadContext("GetRemoteIndexesActionThread")
Expand All @@ -96,8 +105,7 @@ class TransportGetRemoteIndexesAction @Inject constructor(

var resolveIndexResponse: ResolveIndexAction.Response? = null
try {
resolveIndexResponse =
getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService))
resolveIndexResponse = getRemoteClusters(request.indexes)
} catch (e: Exception) {
log.error("Failed to retrieve indexes for request $request", e)
actionListener.onFailure(AlertingException.wrap(e))
Expand Down Expand Up @@ -151,7 +159,7 @@ class TransportGetRemoteIndexesAction @Inject constructor(
clusterIndexesList.add(
ClusterIndexes(
clusterName = clusterName,
clusterHealth = clusterHealthResponse!!.status,
clusterHealth = clusterHealthResponse?.status,
hubCluster = clusterName == clusterService.clusterName.value(),
indexes = clusterIndexList,
latency = latency
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,14 @@ class IndexUtils {
companion object {
val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""")

/**
* This regex asserts that the string:
* The index pattern can start with an optional period
* The index pattern can contain lowercase letters, digits, underscores, hyphens, asterisks, and periods
* The length of the index pattern must be between 1 and 255 characters
*/
val INDEX_PATTERN_REGEX = Regex("""^(?=.{1,255}$)\.?[a-z0-9_\-\*\.]+$""")
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved

const val _META = "_meta"
const val SCHEMA_VERSION = "schema_version"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -966,16 +966,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
num = randomIntBetween(1, 10),
includeWriteIndex = true
),
createIndices: Boolean = true
): MutableMap<String, MutableMap<String, Boolean>> {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
val indexName = createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(indexName, false)
indicesMap[indexName] = isWriteIndex
if (createIndices) createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(it, false)
indicesMap[it] = isWriteIndex
val indexMap = mapOf(
"add" to mapOf(
"index" to indexName,
"index" to it,
"alias" to alias,
"is_write_index" to isWriteIndex
)
Expand Down Expand Up @@ -1297,6 +1298,15 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response.asMap()
}

fun RestClient.getSettings(): Map<String, Any> {
val response = this.makeRequest(
"GET",
"_cluster/settings?flat_settings=true"
)
assertEquals(RestStatus.OK, response.restStatus())
return response.asMap()
}

fun RestClient.updateSettings(setting: String, value: Any): Map<String, Any> {
val settings = jsonBuilder()
.startObject()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.test.OpenSearchTestCase

class GetRemoteIndexesActionTests : OpenSearchTestCase() {
private val validPatterns = listOf(
"local-index-name",
"localindexname",
"local-index-*-pattern-*",
"*local-index-*-pattern-*",
"cluster-name:remote-index-name",
"cluster-name:remoteindexname",
"cluster-name:remote-index-*-pattern-*",
"cluster-name:*remote-index-*-pattern-*",
"cluster-*pattern-*:remote-index-name",
"cluster-*pattern-*:remoteindexname",
"cluster-*pattern-*:remote-index-*-pattern-*",
"cluster-*pattern-*:*remote-index-*-pattern-*",
"*cluster-*pattern-*:remote-index-*-pattern-*",
"cluster-*:pattern-*:remote-index-name",
"cluster-*:pattern-*:remoteindexname",
"cluster-*:pattern-*:remote-index-*-pattern-*",
"*cluster-*:pattern-*:remote-index-*-pattern-*",
)

private val invalidPatterns = listOf(
// `<cluster-pattern>` character length less than 1 should return FALSE
":remote-index-name",

// `<cluster-pattern>` character length greater than 255 should return FALSE
"${randomAlphaOfLength(256)}:remote-index-name",

// Invalid characters should return FALSE
"local-index#-name",
"cluster-name:remote-#index-name",
"cluster-#name:remote-index-name",
"cluster-#name:remote-#index-name",

// More than 1 `:` character in `<cluster-pattern>` should return FALSE
"bad:cluster:name:remote-index-name",
)

fun `test get remote indexes action name`() {
assertNotNull(GetRemoteIndexesAction.INSTANCE.name())
assertEquals(GetRemoteIndexesAction.INSTANCE.name(), GetRemoteIndexesAction.NAME)
}

fun `test GetRemoteIndexesRequest isValid with empty array`() {
val request = GetRemoteIndexesRequest(
indexes = emptyList(),
includeMappings = false
)
assertFalse(request.isValid())
}

fun `test GetRemoteIndexesRequest isValid with one valid entry`() {
validPatterns.forEach {
val request = GetRemoteIndexesRequest(
indexes = listOf(it),
includeMappings = false
)
assertTrue("Expected pattern '$it' to be valid.", request.isValid())
}
}

fun `test GetRemoteIndexesRequest isValid with multiple valid entries`() {
val request = GetRemoteIndexesRequest(
indexes = validPatterns,
includeMappings = false
)
assertTrue(request.isValid())
}

fun `test GetRemoteIndexesRequest isValid with one invalid entry`() {
invalidPatterns.forEach {
val request = GetRemoteIndexesRequest(
indexes = listOf(it),
includeMappings = false
)
assertFalse("Expected pattern '$it' to be invalid.", request.isValid())
}
}

fun `test GetRemoteIndexesRequest isValid with multiple invalid entries`() {
val request = GetRemoteIndexesRequest(
indexes = invalidPatterns,
includeMappings = false
)
assertFalse(request.isValid())
}

fun `test GetRemoteIndexesRequest isValid with valid and invalid entries`() {
val request = GetRemoteIndexesRequest(
indexes = validPatterns + invalidPatterns,
includeMappings = false
)
assertFalse(request.isValid())
}
}
Loading
Loading