From a5c9975f879ae807d7f788ff9e635e67e96e0bdf Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Thu, 11 Apr 2024 10:45:06 -0700 Subject: [PATCH 01/14] Fixed issue where InputService wouldn't wait for cluster metrics monitor to finish executing against all clusters. Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/InputService.kt | 68 ++++++++++--------- 1 file changed, 37 insertions(+), 31 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 5f4941229..71d49cda8 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -7,7 +7,8 @@ package org.opensearch.alerting import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -100,36 +101,7 @@ class InputService( results += searchResponse.convertToMap() } is ClusterMetricsInput -> { - logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) - - val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) - logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) - - val responseMap = mutableMapOf>() - if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { - client.threadPool().threadContext.stashContext().use { - scope.launch { - input.clusters.forEach { cluster -> - val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) - val response = executeTransportAction(input, targetClient) - // Not all supported API reference the cluster name in their response. - // Mapping each response to the cluster name before adding to results. - // Not adding this same logic for local-only monitors to avoid breaking existing monitors. - responseMap[cluster] = response.toMap() - } - } - } - val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT) - val startTime = Instant.now().toEpochMilli() - while ( - (Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) || - (responseMap.size < input.clusters.size) - ) { /* Wait for responses */ } - results += responseMap - } else { - val response = executeTransportAction(input, client) - results += response.toMap() - } + results += handleClusterMetricsInput(input) } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") @@ -287,4 +259,38 @@ class InputService( return searchRequest } + + private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList> { + logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) + + val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + + val results = mutableListOf>() + val responseMap = mutableMapOf>() + if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { + // If remote monitoring is enabled, and the monitor is configured to execute against remote clusters, + // execute the API against each cluster, and compile the results. + client.threadPool().threadContext.stashContext().use { + val singleThreadContext = newSingleThreadContext("ClusterMetricsMonitorThread") + withContext(singleThreadContext) { + it.restore() + input.clusters.forEach { cluster -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) + val response = executeTransportAction(input, targetClient) + // Not all supported API reference the cluster name in their response. + // Mapping each response to the cluster name before adding to results. + // Not adding this same logic for local-only monitors to avoid breaking existing monitors. + responseMap[cluster] = response.toMap() + } + results += responseMap + } + } + } else { + // Else only execute the API against the local cluster. + val response = executeTransportAction(input, client) + results += response.toMap() + } + return results + } } From 02b8387e4290f848f6e38a573cf80b18d77d1ab6 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Thu, 11 Apr 2024 10:46:45 -0700 Subject: [PATCH 02/14] Added input validation for GetRemoteIndexes API, and added related unit and integration tests. Signed-off-by: AWSHurneyt --- .../action/GetRemoteIndexesRequest.kt | 40 +++ .../action/GetRemoteIndexesResponse.kt | 2 +- .../resthandler/RestGetRemoteIndexesAction.kt | 4 +- .../TransportGetRemoteIndexesAction.kt | 14 +- .../opensearch/alerting/util/IndexUtils.kt | 8 + .../alerting/AlertingRestTestCase.kt | 18 +- .../action/GetRemoteIndexesActionTests.kt | 104 ++++++ .../transport/GetRemoteIndexesActionIT.kt | 339 ++++++++++++++++++ 8 files changed, 520 insertions(+), 9 deletions(-) create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt create mode 100644 alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt index 733bc3a04..7d42e97d0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException +import org.opensearch.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import java.io.IOException @@ -36,7 +37,46 @@ class GetRemoteIndexesRequest : ActionRequest { out.writeBoolean(includeMappings) } + /** + * Validates the request [indexes]. + * @return TRUE if all entries are valid; else FALSE. + */ + fun isValid(): Boolean { + return indexes.isNotEmpty() && indexes.all { validPattern(it) } + } + + /** + * Validates individual entries in the request [indexes]. + * + * @param pattern The entry to evaluate. The expected patterns are `` for a local index, and + * `:` for remote indexes. These patterns are consistent with the `GET _resolve/index` API. + * @return TRUE if the entry is valid; else FALSE. + */ + private fun validPattern(pattern: String): Boolean { + // In some situations, `` could contain a `:` character. + // Identifying the `` based on the last occurrence of `:` in the pattern. + val separatorIndex = pattern.lastIndexOf(":") + return if (separatorIndex == -1) { + // Indicates a local index pattern. + INDEX_PATTERN_REGEX.matches(pattern) + } else { + // Indicates a remote index pattern. + val clusterPattern = pattern.substring(0, separatorIndex) + val indexPattern = pattern.substring(separatorIndex + 1) + CLUSTER_PATTERN_REGEX.matches(clusterPattern) && INDEX_PATTERN_REGEX.matches(indexPattern) + } + } + companion object { + /** + * This regex asserts that the string: + * Starts with a lowercase letter, digit, or asterisk + * Contains a sequence of characters followed by an optional colon and another sequence of characters + * The sequences of characters can include lowercase letters, uppercase letters, digits, underscores, asterisks, or hyphens + * The total length of the string can range from 1 to 255 characters + */ + val CLUSTER_PATTERN_REGEX = Regex("^(?=.{1,255}$)[a-z0-9*]([a-zA-Z0-9_*-]*:?[a-zA-Z0-9_*-]*)$") + const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern." const val INDEXES_FIELD = "indexes" const val INCLUDE_MAPPINGS_FIELD = "include_mappings" } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt index 1572b4228..b92d6f408 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt @@ -42,7 +42,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { data class ClusterIndexes( val clusterName: String, - val clusterHealth: ClusterHealthStatus, + val clusterHealth: ClusterHealthStatus?, val hubCluster: Boolean, val indexes: List = listOf(), val latency: Long diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt index 591ab2c3e..b2ff0b766 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt @@ -19,7 +19,9 @@ import org.opensearch.rest.action.RestToXContentListener private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java) class RestGetRemoteIndexesAction : BaseRestHandler() { - val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes" + companion object { + val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes" + } override fun getName(): String { return "get_remote_indexes_action" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt index 5b35d493a..1703da0e3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt @@ -87,6 +87,15 @@ class TransportGetRemoteIndexesAction @Inject constructor( val user = readUserFromThreadContext(client) if (!validateUserBackendRoles(user, actionListener)) return + if (!request.isValid()) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException(GetRemoteIndexesRequest.INVALID_PATTERN_MESSAGE, RestStatus.BAD_REQUEST) + ) + ) + return + } + client.threadPool().threadContext.stashContext().use { scope.launch { val singleThreadContext = newSingleThreadContext("GetRemoteIndexesActionThread") @@ -96,8 +105,7 @@ class TransportGetRemoteIndexesAction @Inject constructor( var resolveIndexResponse: ResolveIndexAction.Response? = null try { - resolveIndexResponse = - getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService)) + resolveIndexResponse = getRemoteClusters(request.indexes) } catch (e: Exception) { log.error("Failed to retrieve indexes for request $request", e) actionListener.onFailure(AlertingException.wrap(e)) @@ -151,7 +159,7 @@ class TransportGetRemoteIndexesAction @Inject constructor( clusterIndexesList.add( ClusterIndexes( clusterName = clusterName, - clusterHealth = clusterHealthResponse!!.status, + clusterHealth = clusterHealthResponse?.status, hubCluster = clusterName == clusterService.clusterName.value(), indexes = clusterIndexList, latency = latency diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 6949ca58d..576939eee 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -28,6 +28,14 @@ class IndexUtils { companion object { val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""") + /** + * This regex asserts that the string: + * The index pattern can start with an optional period + * The index pattern can contain lowercase letters, digits, underscores, hyphens, asterisks, and periods + * The length of the index pattern must be between 1 and 255 characters + */ + val INDEX_PATTERN_REGEX = Regex("""^(?=.{1,255}$)\.?[a-z0-9_\-\*\.]+$""") + const val _META = "_meta" const val SCHEMA_VERSION = "schema_version" diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 5d33000ef..f1ae92260 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -966,16 +966,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { num = randomIntBetween(1, 10), includeWriteIndex = true ), + createIndices: Boolean = true ): MutableMap> { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - val indexName = createTestIndex(index = it, mapping = "") - val isWriteIndex = indices.getOrDefault(indexName, false) - indicesMap[indexName] = isWriteIndex + if (createIndices) createTestIndex(index = it, mapping = "") + val isWriteIndex = indices.getOrDefault(it, false) + indicesMap[it] = isWriteIndex val indexMap = mapOf( "add" to mapOf( - "index" to indexName, + "index" to it, "alias" to alias, "is_write_index" to isWriteIndex ) @@ -1297,6 +1298,15 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response.asMap() } + fun RestClient.getSettings(): Map { + val response = this.makeRequest( + "GET", + "_cluster/settings?flat_settings=true" + ) + assertEquals(RestStatus.OK, response.restStatus()) + return response.asMap() + } + fun RestClient.updateSettings(setting: String, value: Any): Map { val settings = jsonBuilder() .startObject() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt new file mode 100644 index 000000000..37c47ed65 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.test.OpenSearchTestCase + +class GetRemoteIndexesActionTests : OpenSearchTestCase() { + private val validPatterns = listOf( + "local-index-name", + "localindexname", + "local-index-*-pattern-*", + "*local-index-*-pattern-*", + "cluster-name:remote-index-name", + "cluster-name:remoteindexname", + "cluster-name:remote-index-*-pattern-*", + "cluster-name:*remote-index-*-pattern-*", + "cluster-*pattern-*:remote-index-name", + "cluster-*pattern-*:remoteindexname", + "cluster-*pattern-*:remote-index-*-pattern-*", + "cluster-*pattern-*:*remote-index-*-pattern-*", + "*cluster-*pattern-*:remote-index-*-pattern-*", + "cluster-*:pattern-*:remote-index-name", + "cluster-*:pattern-*:remoteindexname", + "cluster-*:pattern-*:remote-index-*-pattern-*", + "*cluster-*:pattern-*:remote-index-*-pattern-*", + ) + + private val invalidPatterns = listOf( + // `` character length less than 1 should return FALSE + ":remote-index-name", + + // `` character length greater than 63 should return FALSE + "${randomAlphaOfLength(256)}:remote-index-name", + + // Invalid characters should return FALSE + "local-index#-name", + "cluster-name:remote-#index-name", + "cluster-#name:remote-index-name", + "cluster-#name:remote-#index-name", + + // More than 1 `:` character in `` should return FALSE + "bad:cluster:name:remote-index-name", + ) + + fun `test get remote indexes action name`() { + assertNotNull(GetRemoteIndexesAction.INSTANCE.name()) + assertEquals(GetRemoteIndexesAction.INSTANCE.name(), GetRemoteIndexesAction.NAME) + } + + fun `test GetRemoteIndexesRequest isValid with empty array`() { + val request = GetRemoteIndexesRequest( + indexes = emptyList(), + includeMappings = false + ) + assertFalse(request.isValid()) + } + + fun `test GetRemoteIndexesRequest isValid with one valid entry`() { + validPatterns.forEach { + val request = GetRemoteIndexesRequest( + indexes = listOf(it), + includeMappings = false + ) + assertTrue("Expected pattern '$it' to be valid.", request.isValid()) + } + } + + fun `test GetRemoteIndexesRequest isValid with multiple valid entries`() { + val request = GetRemoteIndexesRequest( + indexes = validPatterns, + includeMappings = false + ) + assertTrue(request.isValid()) + } + + fun `test GetRemoteIndexesRequest isValid with one invalid entry`() { + invalidPatterns.forEach { + val request = GetRemoteIndexesRequest( + indexes = listOf(it), + includeMappings = false + ) + assertFalse("Expected pattern '$it' to be invalid.", request.isValid()) + } + } + + fun `test GetRemoteIndexesRequest isValid with multiple invalid entries`() { + val request = GetRemoteIndexesRequest( + indexes = invalidPatterns, + includeMappings = false + ) + assertFalse(request.isValid()) + } + + fun `test GetRemoteIndexesRequest isValid with valid and invalid entries`() { + val request = GetRemoteIndexesRequest( + indexes = validPatterns + invalidPatterns, + includeMappings = false + ) + assertFalse(request.isValid()) + } +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt new file mode 100644 index 000000000..ecff97733 --- /dev/null +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt @@ -0,0 +1,339 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import org.opensearch.alerting.AlertingRestTestCase +import org.opensearch.alerting.action.GetRemoteIndexesRequest.Companion.INCLUDE_MAPPINGS_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesRequest.Companion.INDEXES_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesRequest.Companion.INVALID_PATTERN_MESSAGE +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.INDEX_HEALTH_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.INDEX_NAME_FIELD +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.MAPPINGS_FIELD +import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction +import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED +import org.opensearch.client.Response +import org.opensearch.client.ResponseException +import org.opensearch.cluster.health.ClusterHealthStatus +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.commons.alerting.util.string +import org.opensearch.core.rest.RestStatus +import java.util.* + +@Suppress("UNCHECKED_CAST") +class GetRemoteIndexesActionIT : AlertingRestTestCase() { + private var remoteMonitoringEnabled = false + private var remoteClusters = listOf() + + private val mappingFieldToTypePairs1 = listOf( + "timestamp" to "date", + "color" to "keyword", + "message" to "text", + ) + + private val mappingFieldToTypePairs2 = listOf( + "timestamp" to "date", + "message" to "text", + ) + + fun `test with remote monitoring disabled`() { + // Disable remote monitoring if not already disabled + toggleRemoteMonitoring(false) + try { + getRemoteIndexes("$INDEXES_FIELD=*,*:*&$INCLUDE_MAPPINGS_FIELD=false") + fail("Expected 403 Method FORBIDDEN response.") + } catch (e: ResponseException) { + assertEquals(RestStatus.FORBIDDEN, e.response.restStatus()) + assertEquals( + "Remote monitoring is not enabled.", + (e.response.asMap()["error"] as Map)["reason"] + ) + } + } + + fun `test with blank indexes param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + try { + getRemoteIndexes("$INCLUDE_MAPPINGS_FIELD=false") + fail("Expected 400 Method BAD_REQUEST response.") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + assertEquals( + INVALID_PATTERN_MESSAGE, + (e.response.asMap()["error"] as Map)["reason"] + ) + } + } + + fun `test with blank include_mappings param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1, index2) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=*,*:*") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + assertTrue((indexDetails[MAPPINGS_FIELD] as Map).isEmpty()) + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + fun `test with FALSE include_mappings param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1, index2) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=*,*:*&$INCLUDE_MAPPINGS_FIELD=false") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + assertTrue((indexDetails[MAPPINGS_FIELD] as Map).isEmpty()) + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + fun `test with TRUE include_mappings param`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1, index2) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=*,*:*&$INCLUDE_MAPPINGS_FIELD=true") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + + // Validate index mappings + val mappings = (indexDetails[MAPPINGS_FIELD] as Map)["properties"] as Map> + if (indexName == index1) { + mappingFieldToTypePairs1.forEach { + assertNotNull(mappings[it.first]) + assertEquals(it.second, mappings[it.first]!!["type"]) + } + } else { + mappingFieldToTypePairs2.forEach { + assertNotNull(mappings[it.first]) + assertEquals(it.second, mappings[it.first]!!["type"]) + } + } + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + fun `test with specific index name`() { + // Enable remote monitoring if not already enabled + toggleRemoteMonitoring(true) + + // Create test indexes + val index1 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs1) + ) + + val index2 = createTestIndex( + index = randomAlphaOfLength(10).lowercase(Locale.ROOT), + mapping = formatMappingsJson(mappingFieldToTypePairs2) + ) + + val expectedNames = listOf(index1) + + // Call API + val response = getRemoteIndexes("$INDEXES_FIELD=$index1:*&$INCLUDE_MAPPINGS_FIELD=true") + assertEquals(RestStatus.OK, response.restStatus()) + + val responseMap = response.asMap() as Map> + responseMap.entries.forEach { (clusterName, clusterDetails) -> + // Validate cluster-level response details + assertEquals(clusterName, clusterDetails[ClusterIndexes.CLUSTER_NAME_FIELD]) + assertClusterHealth(clusterDetails[ClusterIndexes.CLUSTER_HEALTH_FIELD] as String) + assertTrue(clusterDetails[ClusterIndexes.HUB_CLUSTER_FIELD] is Boolean) + assertTrue(clusterDetails[ClusterIndexes.INDEX_LATENCY_FIELD] is Number) + + assertNotNull(clusterDetails[ClusterIndexes.INDEXES_FIELD]) + val indexes = clusterDetails[ClusterIndexes.INDEXES_FIELD] as Map> + assertEquals(expectedNames.size, indexes.keys.size) + + // Validate index-level response details + expectedNames.forEach { indexName -> + assertNotNull(indexes[indexName]) + val indexDetails = indexes[indexName]!! + assertEquals(indexName, indexDetails[INDEX_NAME_FIELD]) + assertClusterHealth(indexDetails[INDEX_HEALTH_FIELD] as String) + val mappings = (indexDetails[MAPPINGS_FIELD] as Map)["properties"] as Map> + + // Validate index mappings + mappingFieldToTypePairs1.forEach { + assertNotNull(mappings[it.first]) + assertEquals(it.second, mappings[it.first]!!["type"]) + } + } + } + + // Delete test indexes + deleteIndex(index1) + deleteIndex(index2) + } + + private fun getRemoteIndexes(params: String): Response { + return client().makeRequest("GET", "${RestGetRemoteIndexesAction.ROUTE}?$params") + } + + private fun toggleRemoteMonitoring(setting: Boolean) { + if (remoteMonitoringEnabled != setting) { + client().updateSettings(REMOTE_MONITORING_ENABLED.key, setting) + + val settings = client().getSettings() + val updatedSetting = getEnabledSetting(settings) + + if (setting) assertTrue(updatedSetting) + else assertFalse(updatedSetting) + + remoteMonitoringEnabled = updatedSetting + + compileRemoteClustersList(settings) + } + } + + private fun compileRemoteClustersList(settings: Map) { + if (remoteClusters.isEmpty()) { + val remotes = settings["persistent.cluster.remote"] as Map? + remoteClusters = remotes?.keys?.toList() ?: emptyList() + } + } + + private fun getEnabledSetting(settings: Map): Boolean { + val persistentSettings = settings["persistent"] as Map + val updatedSetting = persistentSettings[REMOTE_MONITORING_ENABLED.key] + assertNotNull(updatedSetting) + return (updatedSetting as String).toBoolean() + } + + private fun formatMappingsJson(fieldToTypePairs: List>): String { + val builder = XContentFactory.jsonBuilder() + .startObject() + .startObject("properties") + fieldToTypePairs.forEach { + builder.startObject(it.first) + .field("type", it.second) + .endObject() + } + builder.endObject().endObject() + val mappingsJson = builder.string() + return mappingsJson.substring(1, mappingsJson.lastIndex) + } + + private fun assertClusterHealth(health: String) { + try { + ClusterHealthStatus.fromString(health) + } catch (e: IllegalArgumentException) { + fail("Should not throw IllegalArgumentException.") + } + } +} From 1b628b975b575268c23df6f8c2b5debb825b70a4 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Thu, 11 Apr 2024 10:59:22 -0700 Subject: [PATCH 03/14] Removed unused variable, and imports. Signed-off-by: AWSHurneyt --- .../src/main/kotlin/org/opensearch/alerting/InputService.kt | 4 ---- 1 file changed, 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 71d49cda8..640abef7f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -5,8 +5,6 @@ package org.opensearch.alerting -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.newSingleThreadContext import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager @@ -50,8 +48,6 @@ import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant -private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) - /** Service that handles the collection of input results for Monitor executions */ class InputService( val client: Client, From a3b2202771a627151c40b715ea89a7e4301a8d31 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Thu, 11 Apr 2024 11:18:43 -0700 Subject: [PATCH 04/14] Made initial call to GetRemoteIndexes API log INFO level to capture timestamp of incoming request. Signed-off-by: AWSHurneyt --- .../alerting/resthandler/RestGetRemoteIndexesAction.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt index b2ff0b766..5d91767bd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt @@ -34,7 +34,7 @@ class RestGetRemoteIndexesAction : BaseRestHandler() { } override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { - log.debug("${request.method()} $ROUTE") + log.info("${request.method()} $ROUTE") val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, "")) val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false) return RestChannelConsumer { From 16607447ccaf49125d6ab1aa200614fefb443653 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Thu, 11 Apr 2024 11:42:35 -0700 Subject: [PATCH 05/14] Fixed comment. Signed-off-by: AWSHurneyt --- .../opensearch/alerting/action/GetRemoteIndexesActionTests.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt index 37c47ed65..f4639dd6e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/action/GetRemoteIndexesActionTests.kt @@ -32,7 +32,7 @@ class GetRemoteIndexesActionTests : OpenSearchTestCase() { // `` character length less than 1 should return FALSE ":remote-index-name", - // `` character length greater than 63 should return FALSE + // `` character length greater than 255 should return FALSE "${randomAlphaOfLength(256)}:remote-index-name", // Invalid characters should return FALSE From 997408bff870415315380b1b98444301df6564a3 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 13:56:22 -0700 Subject: [PATCH 06/14] Moved some regex to common utils. Signed-off-by: AWSHurneyt --- .../alerting/action/GetRemoteIndexesRequest.kt | 11 ++--------- .../kotlin/org/opensearch/alerting/util/IndexUtils.kt | 10 ---------- .../clusterMetricsMonitorHelpers/CatIndicesHelpers.kt | 2 +- .../clusterMetricsMonitorHelpers/CatShardsHelpers.kt | 2 +- 4 files changed, 4 insertions(+), 21 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt index 7d42e97d0..37c0a8e19 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -7,7 +7,8 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException -import org.opensearch.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX +import org.opensearch.commons.alerting.util.IndexUtils.Companion.CLUSTER_PATTERN_REGEX +import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import java.io.IOException @@ -68,14 +69,6 @@ class GetRemoteIndexesRequest : ActionRequest { } companion object { - /** - * This regex asserts that the string: - * Starts with a lowercase letter, digit, or asterisk - * Contains a sequence of characters followed by an optional colon and another sequence of characters - * The sequences of characters can include lowercase letters, uppercase letters, digits, underscores, asterisks, or hyphens - * The total length of the string can range from 1 to 255 characters - */ - val CLUSTER_PATTERN_REGEX = Regex("^(?=.{1,255}$)[a-z0-9*]([a-zA-Z0-9_*-]*:?[a-zA-Z0-9_*-]*)$") const val INVALID_PATTERN_MESSAGE = "Indexes includes an invalid pattern." const val INDEXES_FIELD = "indexes" const val INCLUDE_MAPPINGS_FIELD = "include_mappings" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 576939eee..f0848aadb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -26,16 +26,6 @@ import org.opensearch.core.xcontent.XContentParser class IndexUtils { companion object { - val VALID_INDEX_NAME_REGEX = Regex("""^(?![_\-\+])(?!.*\.\.)[^\s,\\\/\*\?"<>|#:\.]{1,255}$""") - - /** - * This regex asserts that the string: - * The index pattern can start with an optional period - * The index pattern can contain lowercase letters, digits, underscores, hyphens, asterisks, and periods - * The length of the index pattern must be between 1 and 255 characters - */ - val INDEX_PATTERN_REGEX = Regex("""^(?=.{1,255}$)\.?[a-z0-9_\-\*\.]+$""") - const val _META = "_meta" const val SCHEMA_VERSION = "schema_version" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt index 8e92b597f..36c09f244 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatIndicesHelpers.kt @@ -19,9 +19,9 @@ import org.opensearch.action.admin.indices.stats.CommonStats import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.support.IndicesOptions -import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.common.time.DateFormatter +import org.opensearch.commons.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.core.action.ActionResponse import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt index 12152e69d..f092b7f12 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/clusterMetricsMonitorHelpers/CatShardsHelpers.kt @@ -14,9 +14,9 @@ import org.opensearch.action.admin.indices.stats.CommonStats import org.opensearch.action.admin.indices.stats.IndicesStatsRequest import org.opensearch.action.admin.indices.stats.IndicesStatsResponse import org.opensearch.action.admin.indices.stats.ShardStats -import org.opensearch.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.cluster.routing.UnassignedInfo import org.opensearch.common.unit.TimeValue +import org.opensearch.commons.alerting.util.IndexUtils.Companion.VALID_INDEX_NAME_REGEX import org.opensearch.core.action.ActionResponse import org.opensearch.core.common.io.stream.StreamOutput import org.opensearch.core.common.io.stream.Writeable From 97fbd297ee077d24325be9c649dfc9fdb85e378c Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 14:14:52 -0700 Subject: [PATCH 07/14] Renamed cross cluster monitor setting. Signed-off-by: AWSHurneyt --- .../main/kotlin/org/opensearch/alerting/AlertingPlugin.kt | 2 +- .../src/main/kotlin/org/opensearch/alerting/InputService.kt | 2 +- .../org/opensearch/alerting/QueryLevelMonitorRunner.kt | 2 +- .../org/opensearch/alerting/settings/AlertingSettings.kt | 4 ++-- .../alerting/transport/TransportGetRemoteIndexesAction.kt | 6 +++--- .../alerting/transport/GetRemoteIndexesActionIT.kt | 6 +++--- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 911ae5fd7..f8f3867dc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -364,7 +364,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE, - AlertingSettings.REMOTE_MONITORING_ENABLED + AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index 640abef7f..478817c23 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -259,7 +259,7 @@ class InputService( private suspend fun handleClusterMetricsInput(input: ClusterMetricsInput): MutableList> { logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) - val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) val results = mutableListOf>() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index a77121069..3d9ab5a78 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -71,7 +71,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { val remoteMonitoringEnabled = - monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED) logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) if (remoteMonitoringEnabled) monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 5039ec329..21ef8f1bc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -202,8 +202,8 @@ class AlertingSettings { Setting.Property.Dynamic ) - val REMOTE_MONITORING_ENABLED = Setting.boolSetting( - "plugins.alerting.remote_monitoring_enabled", + val CROSS_CLUSTER_MONITORING_ENABLED = Setting.boolSetting( + "plugins.alerting.cross_cluster_monitoring_enabled", false, Setting.Property.NodeScope, Setting.Property.Dynamic ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt index 1703da0e3..446c96649 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt @@ -27,7 +27,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings -import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.CROSS_CLUSTER_MONITORING_ENABLED import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.client.Client @@ -62,10 +62,10 @@ class TransportGetRemoteIndexesAction @Inject constructor( @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) - @Volatile private var remoteMonitoringEnabled = REMOTE_MONITORING_ENABLED.get(settings) + @Volatile private var remoteMonitoringEnabled = CROSS_CLUSTER_MONITORING_ENABLED.get(settings) init { - clusterService.clusterSettings.addSettingsUpdateConsumer(REMOTE_MONITORING_ENABLED) { remoteMonitoringEnabled = it } + clusterService.clusterSettings.addSettingsUpdateConsumer(CROSS_CLUSTER_MONITORING_ENABLED) { remoteMonitoringEnabled = it } listenFilterBySettingChange(clusterService) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt index ecff97733..99556fe3c 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/transport/GetRemoteIndexesActionIT.kt @@ -15,7 +15,7 @@ import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.Cl import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex.Companion.MAPPINGS_FIELD import org.opensearch.alerting.makeRequest import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction -import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.CROSS_CLUSTER_MONITORING_ENABLED import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.cluster.health.ClusterHealthStatus @@ -287,7 +287,7 @@ class GetRemoteIndexesActionIT : AlertingRestTestCase() { private fun toggleRemoteMonitoring(setting: Boolean) { if (remoteMonitoringEnabled != setting) { - client().updateSettings(REMOTE_MONITORING_ENABLED.key, setting) + client().updateSettings(CROSS_CLUSTER_MONITORING_ENABLED.key, setting) val settings = client().getSettings() val updatedSetting = getEnabledSetting(settings) @@ -310,7 +310,7 @@ class GetRemoteIndexesActionIT : AlertingRestTestCase() { private fun getEnabledSetting(settings: Map): Boolean { val persistentSettings = settings["persistent"] as Map - val updatedSetting = persistentSettings[REMOTE_MONITORING_ENABLED.key] + val updatedSetting = persistentSettings[CROSS_CLUSTER_MONITORING_ENABLED.key] assertNotNull(updatedSetting) return (updatedSetting as String).toBoolean() } From 7b570a19cbb9db70405713035aab18d82aa418bc Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 14:15:55 -0700 Subject: [PATCH 08/14] Fixed import. Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/action/GetRemoteIndexesRequest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt index 37c0a8e19..f8f1fd51c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -7,7 +7,7 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException -import org.opensearch.commons.alerting.util.IndexUtils.Companion.CLUSTER_PATTERN_REGEX +import org.opensearch.commons.alerting.util.ValidationHelpers.Companion.CLUSTER_PATTERN_REGEX import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput From 908b2c30805ab5e442460c412a0689fec47ead5f Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 14:29:35 -0700 Subject: [PATCH 09/14] Fixed import. Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/action/GetRemoteIndexesRequest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt index f8f1fd51c..4f995e366 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -7,7 +7,7 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException -import org.opensearch.commons.alerting.util.ValidationHelpers.Companion.CLUSTER_PATTERN_REGEX +import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput From 7f5cd228e43c818883bb7f5436a3419e4cdb8fba Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 14:34:52 -0700 Subject: [PATCH 10/14] Fixed ktlint error. Signed-off-by: AWSHurneyt --- .../org/opensearch/alerting/action/GetRemoteIndexesRequest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt index 4f995e366..8b371ba26 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -7,8 +7,8 @@ package org.opensearch.alerting.action import org.opensearch.action.ActionRequest import org.opensearch.action.ActionRequestValidationException -import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX import org.opensearch.commons.alerting.util.IndexUtils.Companion.INDEX_PATTERN_REGEX +import org.opensearch.commons.utils.CLUSTER_PATTERN_REGEX import org.opensearch.core.common.io.stream.StreamInput import org.opensearch.core.common.io.stream.StreamOutput import java.io.IOException From 41167ffd54f5427e25782a5605bd6c4af74dd42d Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 17:03:08 -0700 Subject: [PATCH 11/14] Added null checks for health statuses. Signed-off-by: AWSHurneyt --- .../alerting/action/GetRemoteIndexesResponse.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt index b92d6f408..0f694bbf5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt @@ -51,7 +51,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( clusterName = sin.readString(), - clusterHealth = sin.readEnum(ClusterHealthStatus::class.java), + clusterHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom), hubCluster = sin.readBoolean(), indexes = sin.readList((ClusterIndex.Companion)::readFrom), latency = sin.readLong() @@ -72,7 +72,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { override fun writeTo(out: StreamOutput) { out.writeString(clusterName) - out.writeEnum(clusterHealth) + if (clusterHealth != null) out.writeEnum(clusterHealth) indexes.forEach { it.writeTo(out) } out.writeLong(latency) } @@ -100,7 +100,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { @Throws(IOException::class) constructor(sin: StreamInput) : this( indexName = sin.readString(), - indexHealth = sin.readEnum(ClusterHealthStatus::class.java), + indexHealth = sin.readOptionalWriteable(ClusterHealthStatus::readFrom), mappings = sin.readOptionalWriteable(::MappingMetadata) ) @@ -115,7 +115,7 @@ class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { override fun writeTo(out: StreamOutput) { out.writeString(indexName) - out.writeEnum(indexHealth) + if (indexHealth != null) out.writeEnum(indexHealth) if (mappings != null) out.writeMap(mappings.sourceAsMap) } From 23ed520b51ee5c8237112ded0893808c0c83797e Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 17:23:41 -0700 Subject: [PATCH 12/14] Wrapped Monitor.parse calls in AlertingExceptions so IllegalArgumentExceptions are wrapped in 4xx-level exceptions. Signed-off-by: AWSHurneyt --- .../resthandler/RestExecuteMonitorAction.kt | 10 ++- .../resthandler/RestIndexMonitorAction.kt | 64 +++++++++++-------- 2 files changed, 46 insertions(+), 28 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt index 740dcb2d6..f9d88bc5b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestExecuteMonitorAction.kt @@ -9,6 +9,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.action.ExecuteMonitorAction import org.opensearch.alerting.action.ExecuteMonitorRequest +import org.opensearch.alerting.util.AlertingException import org.opensearch.client.node.NodeClient import org.opensearch.common.unit.TimeValue import org.opensearch.commons.alerting.model.Monitor @@ -64,7 +65,14 @@ class RestExecuteMonitorAction : BaseRestHandler() { } else { val xcp = request.contentParser() ensureExpectedToken(START_OBJECT, xcp.nextToken(), xcp) - val monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION) + + val monitor: Monitor + try { + monitor = Monitor.parse(xcp, Monitor.NO_ID, Monitor.NO_VERSION) + } catch (e: Exception) { + throw AlertingException.wrap(e) + } + val execMonitorRequest = ExecuteMonitorRequest(dryrun, requestEnd, null, monitor) client.execute(ExecuteMonitorAction.INSTANCE, execMonitorRequest, RestToXContentListener(channel)) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt index 5e4a8c155..27c652b59 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -8,6 +8,7 @@ import org.apache.logging.log4j.LogManager import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.AlertingPlugin import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IF_PRIMARY_TERM import org.opensearch.alerting.util.IF_SEQ_NO import org.opensearch.alerting.util.REFRESH @@ -77,48 +78,57 @@ class RestIndexMonitorAction : BaseRestHandler() { val id = request.param("monitorID", Monitor.NO_ID) if (request.method() == PUT && Monitor.NO_ID == id) { - throw IllegalArgumentException("Missing monitor ID") + throw AlertingException.wrap(IllegalArgumentException("Missing monitor ID")) } // Validate request by parsing JSON to Monitor val xcp = request.contentParser() ensureExpectedToken(Token.START_OBJECT, xcp.nextToken(), xcp) - val monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) - val rbacRoles = request.contentParser().map()["rbac_roles"] as List? - - validateDataSources(monitor) - val monitorType = monitor.monitorType - val triggers = monitor.triggers - when (monitorType) { - Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is QueryLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor") + + val monitor: Monitor + val rbacRoles: List? + try { + monitor = Monitor.parse(xcp, id).copy(lastUpdateTime = Instant.now()) + + rbacRoles = request.contentParser().map()["rbac_roles"] as List? + + validateDataSources(monitor) + val monitorType = monitor.monitorType + val triggers = monitor.triggers + when (monitorType) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is QueryLevelTrigger) { + throw (IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor")) + } } } - } - Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is BucketLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor") + Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is BucketLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor") + } } } - } - Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { - triggers.forEach { - if (it !is QueryLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor") + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + triggers.forEach { + if (it !is QueryLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor") + } } } - } - Monitor.MonitorType.DOC_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is DocumentLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") + Monitor.MonitorType.DOC_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is DocumentLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") + } } } } + } catch (e: Exception) { + throw AlertingException.wrap(e) } + val seqNo = request.paramAsLong(IF_SEQ_NO, SequenceNumbers.UNASSIGNED_SEQ_NO) val primaryTerm = request.paramAsLong(IF_PRIMARY_TERM, SequenceNumbers.UNASSIGNED_PRIMARY_TERM) val refreshPolicy = if (request.hasParam(REFRESH)) { From 66d7e05e44f5465c7535b61fac1a0550d8a5a55d Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 17:38:18 -0700 Subject: [PATCH 13/14] Fixed merge error. Signed-off-by: AWSHurneyt --- .../opensearch/alerting/resthandler/RestIndexMonitorAction.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt index 5b85583e4..e4c7eb1da 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -121,13 +121,13 @@ class RestIndexMonitorAction : BaseRestHandler() { } } Monitor.MonitorType.DOC_LEVEL_MONITOR -> { + validateDocLevelQueryName(monitor) triggers.forEach { if (it !is DocumentLevelTrigger) { throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") } } } - validateDocLevelQueryName(monitor) } } catch (e: Exception) { throw AlertingException.wrap(e) From 497a241b950b3d1486db7d41453b1a1a6adc5d24 Mon Sep 17 00:00:00 2001 From: AWSHurneyt Date: Fri, 12 Apr 2024 18:04:17 -0700 Subject: [PATCH 14/14] Fixed test. Signed-off-by: AWSHurneyt --- .../kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 83f5d5b76..f53438c1f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2033,7 +2033,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { ) fail("Expected create monitor to fail") } catch (e: ResponseException) { - assertTrue(e.message!!.contains("illegal_argument_exception")) + assertTrue(e.message!!.contains("alerting_exception")) } }