Skip to content

Commit

Permalink
optimize doc-level monitor execution workflow for datastreams (#1302)
Browse files Browse the repository at this point in the history
* optimize doc-level monitor execution for datastreams

Signed-off-by: Subhobrata Dey <[email protected]>

* add more tests to address comments

Signed-off-by: Subhobrata Dey <[email protected]>

* add integTest for multiple datastreams inside a single index pattern

* add integTest for multiple datastreams inside a single index pattern

Signed-off-by: Subhobrata Dey <[email protected]>

---------

Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 authored Dec 8, 2023
1 parent b7c1823 commit 903a6d9
Show file tree
Hide file tree
Showing 6 changed files with 559 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val concreteIndices = IndexUtils.resolveAllIndices(
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
if (allConcreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}
Expand All @@ -141,7 +141,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// cleanup old indices that are not monitored anymore from the same monitor
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!concreteIndices.contains(ind)) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}
Expand All @@ -150,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

docLevelMonitorInput.indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
var lastWriteIndex: String? = null
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
Expand All @@ -179,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -214,11 +215,19 @@ object MonitorMetadataService :
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)

val indices = mutableListOf<String>()
if (IndexUtils.isAlias(index, clusterService.state()) ||
IndexUtils.isDataStream(index, clusterService.state())
) {
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
} else {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
indices.addAll(getIndexResponse.indices())
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

// Run through each backing index and apply appropriate mappings to query index
indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val updatedProperties = mutableMapOf<String, Any>()
val allFlattenPaths = mutableSetOf<Pair<String, String>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -152,5 +153,47 @@ class IndexUtils {

return result
}

@JvmStatic
fun isDataStream(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().dataStreams().containsKey(name)
}

@JvmStatic
fun isAlias(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().hasAlias(name)
}

@JvmStatic
fun getWriteIndex(index: String, clusterState: ClusterState): String? {
if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
if (metadata != null) {
return metadata.index.name
}
}
return null
}

@JvmStatic
fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
val filteredIndices = mutableListOf<String>()
val lookup = clusterState.metadata().indicesLookup
concreteIndices.forEach { indexName ->
val index = lookup[indexName]
val indexMetadata = clusterState.metadata.index(indexName)
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
if (indexMetadata.creationDate >= thresholdDate) {
filteredIndices.add(indexName)
}
}
}
return filteredIndices
}

@JvmStatic
fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
return clusterState.metadata.index(index).creationDate
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Superclass for tests that interact with an external test cluster using OpenSearch's RestClient
Expand Down Expand Up @@ -893,7 +895,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
Expand Down Expand Up @@ -928,6 +930,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return index
}

protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
return index
}

protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String {
try {
createIndex(
Expand Down Expand Up @@ -963,7 +970,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
val indexName = createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(indexName, false)
indicesMap[indexName] = isWriteIndex
val indexMap = mapOf(
Expand All @@ -980,17 +987,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return mutableMapOf(alias to indicesMap)
}

protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
val indexPattern = "$datastream*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
if (useComponentTemplate) {
// Setup index_template
createComponentTemplateWithMappings(
"my_ds_component_template-$datastream",
componentTemplateMappings
)
}
createComposableIndexTemplate(
"my_index_template_ds-$datastream",
listOf(indexPattern),
(if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
mappings,
true,
0
)
createDataStream(datastream)
}

protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
client().makeRequest("PUT", "_data_stream/$datastream")
}

protected fun deleteDataStream(datastream: String) {
client().makeRequest("DELETE", "_data_stream/$datastream")
}

protected fun createIndexAlias(alias: String, mappings: String?) {
val indexPattern = "$alias*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
createComponentTemplateWithMappings(
"my_alias_component_template-$alias",
componentTemplateMappings
)
createComposableIndexTemplate(
"my_index_template_alias-$alias",
listOf(indexPattern),
"my_alias_component_template-$alias",
mappings,
false,
0
)
createTestIndex(
"$alias-000001",
null,
"""
"$alias": {
"is_write_index": true
}
""".trimIndent()
)
}

protected fun deleteIndexAlias(alias: String) {
client().makeRequest("DELETE", "$alias*/_alias/$alias")
}

protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
val body = """{"template" : { "mappings": {$mappings} }}"""
client().makeRequest(
"PUT",
"_component_template/$componentTemplateName",
emptyMap(),
StringEntity(body, ContentType.APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun createComposableIndexTemplate(
templateName: String,
indexPatterns: List<String>,
componentTemplateName: String?,
mappings: String?,
isDataStream: Boolean,
priority: Int
) {
var body = "{\n"
if (isDataStream) {
body += "\"data_stream\": { },"
}
body += "\"index_patterns\": [" +
indexPatterns.stream().collect(
Collectors.joining(",", "\"", "\"")
) + "],"
if (componentTemplateName == null) {
body += "\"template\": {\"mappings\": {$mappings}},"
}
if (componentTemplateName != null) {
body += "\"composed_of\": [\"$componentTemplateName\"],"
}
body += "\"priority\":$priority}"
client().makeRequest(
"PUT",
"_index_template/$templateName",
emptyMap(),
StringEntity(body, APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun getDatastreamWriteIndex(datastream: String): String {
val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
var respAsMap = responseAsMap(response)
if (respAsMap.containsKey("data_streams")) {
respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
val indices = respAsMap["indices"] as List<Map<String, Any>>
val index = indices.last()
return index["index_name"] as String
} else {
respAsMap = respAsMap[datastream] as Map<String, Object>
}
val indices = respAsMap["indices"] as Array<String>
return indices.last()
}

protected fun rolloverDatastream(datastream: String) {
client().makeRequest(
"POST",
datastream + "/_rollover",
emptyMap(),
null
)
}

protected fun randomAliasIndices(
alias: String,
num: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true,
): Map<String, Boolean> {
val indices = mutableMapOf<String, Boolean>()
val writeIndex = randomIntBetween(0, num)
val writeIndex = randomIntBetween(0, num - 1)
for (i: Int in 0 until num) {
var indexName = randomAlphaOfLength(10)
var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
while (indexName.equals(alias) || indices.containsKey(indexName))
indexName = randomAlphaOfLength(10)
indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
indices[indexName] = includeWriteIndex && i == writeIndex
}
return indices
Expand Down
Loading

0 comments on commit 903a6d9

Please sign in to comment.