From d2a590e744c920820536efc5e1a88a4185eeed74 Mon Sep 17 00:00:00 2001 From: Riya <69919272+riysaxen-amzn@users.noreply.github.com> Date: Thu, 4 Apr 2024 11:03:32 -0700 Subject: [PATCH] =?UTF-8?q?set=20the=20cancelAfterTimeInterval=20parameter?= =?UTF-8?q?=20on=20SearchRequest=20object=20in=20=E2=80=A6=20(#1366)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * set the cancelAfterTimeInterval parameter on SearchRequest object in all MonitorRunners Signed-off-by: Riya Saxena * address the comments for pr 1366 Signed-off-by: Riya Saxena * address the comments for pr 1366 Signed-off-by: Riya Saxena * fix merge conflicts * fix merge conflicts Signed-off-by: Riya Saxena * fix merge conflicts Signed-off-by: Riya Saxena --------- Signed-off-by: Riya Saxena Signed-off-by: Riya <69919272+riysaxen-amzn@users.noreply.github.com> --- .../opensearch/alerting/BucketLevelMonitorRunner.kt | 5 +++++ .../alerting/DocumentLevelMonitorRunner.kt | 12 ++++++++++-- .../alerting/MonitorRunnerExecutionContext.kt | 1 + .../org/opensearch/alerting/MonitorRunnerService.kt | 7 +++++++ .../transport/TransportGetFindingsAction.kt | 1 - .../org/opensearch/alerting/util/AlertingUtils.kt | 13 +++++++++++++ .../org/opensearch/alerting/AlertServiceTests.kt | 1 - 7 files changed, 36 insertions(+), 4 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt index 4ef74127d..47fef3edb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/BucketLevelMonitorRunner.kt @@ -26,10 +26,12 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.util.getBucketKeysHash +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.getCombinedTriggerRunResult import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.client.Client +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert @@ -446,6 +448,9 @@ object BucketLevelMonitorRunner : MonitorRunner() { queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues)) sr.source().query(queryBuilder) } + sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes( + getCancelAfterTimeInterval() + ) val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) } return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId) } else { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 7262b9260..51a32b642 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -33,6 +33,7 @@ import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy +import org.opensearch.alerting.util.getCancelAfterTimeInterval import org.opensearch.alerting.util.parseSampleDocTags import org.opensearch.alerting.util.printsSampleDocData import org.opensearch.alerting.workflow.WorkflowRunContext @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.AlertingPluginInterface @@ -116,7 +118,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { logger.error("Error setting up alerts and findings indices for monitor: $id", e) monitorResult = monitorResult.copy(error = AlertingException.wrap(e)) } - try { validate(monitor) } catch (e: Exception) { @@ -881,7 +882,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() { .size(monitorCtx.docLevelMonitorShardFetchSize) ) .preference(Preference.PRIMARY_FIRST.type()) - + request.cancelAfterTimeInterval = TimeValue.timeValueMinutes( + getCancelAfterTimeInterval() + ) if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) { request.source().fetchSource(false) for (field in fieldsToFetch) { @@ -936,7 +939,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() { "$monitorInputIndices against query index $queryIndices" ) var response: SearchResponse + try { + searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes( + getCancelAfterTimeInterval() + ) + response = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index f289aa390..eefad1b6b 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -53,6 +53,7 @@ data class MonitorRunnerExecutionContext( @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, @Volatile var indexTimeout: TimeValue? = null, + @Volatile var cancelAfterTimeInterval: TimeValue? = null, @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, @Volatile var fetchOnlyQueryFieldNames: Boolean = true, @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 0763bcae4..4ff0bee73 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.bulk.BackoffPolicy +import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts @@ -153,6 +154,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon ALERT_BACKOFF_MILLIS.get(monitorCtx.settings), ALERT_BACKOFF_COUNT.get(monitorCtx.settings) ) + + monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count -> monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count) } @@ -169,6 +173,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count) } + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) { + monitorCtx.cancelAfterTimeInterval = it + } monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings) monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) { monitorCtx.allowList = it diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt index d837dc57b..6835a9625 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsAction.kt @@ -132,7 +132,6 @@ class TransportGetFindingsSearchAction @Inject constructor( ) } searchSourceBuilder.query(queryBuilder).trackTotalHits(true) - client.threadPool().threadContext.stashContext().use { scope.launch { try { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index ba1ad261d..355945939 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -6,6 +6,8 @@ package org.opensearch.alerting.util import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertService +import org.opensearch.alerting.MonitorRunnerService import org.opensearch.alerting.model.AlertContext import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.destination.Destination @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.script.Script +import kotlin.math.max private val logger = LogManager.getLogger("AlertingUtils") @@ -172,6 +175,16 @@ inline fun T.use(block: (T) -> R): R { } } +fun getCancelAfterTimeInterval(): Long { + // The default value for the cancelAfterTimeInterval is -1 and so, in this case + // we should ignore processing on the value + val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes + if (givenInterval == -1L) { + return givenInterval + } + return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes) +} + /** * Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when * it's being closed due to some other [cause] exception occurred. diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt index 5c0e12b66..2da1c5830 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertServiceTests.kt @@ -47,7 +47,6 @@ class AlertServiceTests : OpenSearchTestCase() { xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java) threadPool = Mockito.mock(ThreadPool::class.java) clusterService = Mockito.mock(ClusterService::class.java) - settings = Settings.builder().build() val settingSet = hashSetOf>() settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)