Skip to content

Commit

Permalink
set the cancelAfterTimeInterval parameter on SearchRequest object in …
Browse files Browse the repository at this point in the history
…all MonitorRunners

Signed-off-by: Riya Saxena <[email protected]>
  • Loading branch information
riysaxen-amzn committed Jan 4, 2024
1 parent 526433a commit a865512
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 4 deletions.
11 changes: 11 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.math.max

/** Service that handles CRUD operations for alerts */
class AlertService(
Expand Down Expand Up @@ -876,6 +877,16 @@ class AlertService(
return searchResponse
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
return givenInterval
}
return max(givenInterval, ALERTS_SEARCH_TIMEOUT.minutes)
}

private fun List<AlertError>?.update(alertError: AlertError?): List<AlertError> {
return when {
this == null && alertError == null -> emptyList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -409,6 +410,10 @@ object BucketLevelMonitorRunner : MonitorRunner() {
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
)
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
Expand Down Expand Up @@ -64,7 +65,6 @@ import java.io.IOException
import java.time.Instant
import java.util.UUID
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)

Expand All @@ -90,7 +90,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

try {
validate(monitor)
} catch (e: Exception) {
Expand Down Expand Up @@ -583,7 +582,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.query(QueryBuilders.matchAllQuery())
.size(1)
)
request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
)

val response: SearchResponse = client.suspendUntil { client.search(request, it) }

if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
Expand Down Expand Up @@ -668,6 +673,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
.size(10000) // fixme: make this configurable.
)
.preference(Preference.PRIMARY_FIRST.type())

request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
)

val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) }
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to search shard: $shard")
Expand Down Expand Up @@ -706,7 +717,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
searchRequest.source(searchSourceBuilder)

var response: SearchResponse

try {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
MonitorRunnerService
.monitorCtx.alertService!!.getCancelAfterTimeInterval()
)

response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext(
@Volatile var destinationContextFactory: DestinationContextFactory? = null,

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var cancelAfterTimeInterval: TimeValue? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
Expand Down Expand Up @@ -138,6 +139,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
ALERT_BACKOFF_MILLIS.get(monitorCtx.settings),
ALERT_BACKOFF_COUNT.get(monitorCtx.settings)
)

monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count ->
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
Expand All @@ -154,6 +158,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
monitorCtx.cancelAfterTimeInterval = it
}
monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) {
monitorCtx.allowList = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.junit.Before
import org.junit.Ignore
import org.mockito.Mockito
import org.opensearch.Version
import org.opensearch.alerting.alerts.AlertIndices
Expand All @@ -17,6 +18,7 @@ import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.ClusterSettings
import org.opensearch.common.settings.Setting
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.commons.alerting.model.AggregationResultBucket
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
Expand All @@ -28,6 +30,7 @@ import org.opensearch.test.OpenSearchTestCase
import org.opensearch.threadpool.ThreadPool
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit

class AlertServiceTests : OpenSearchTestCase() {

Expand All @@ -39,6 +42,7 @@ class AlertServiceTests : OpenSearchTestCase() {

private lateinit var alertIndices: AlertIndices
private lateinit var alertService: AlertService
private lateinit var monitorRunnerService: MonitorRunnerService

@Before
fun setup() {
Expand All @@ -47,7 +51,6 @@ class AlertServiceTests : OpenSearchTestCase() {
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
threadPool = Mockito.mock(ThreadPool::class.java)
clusterService = Mockito.mock(ClusterService::class.java)

settings = Settings.builder().build()
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down Expand Up @@ -216,6 +219,30 @@ class AlertServiceTests : OpenSearchTestCase() {
assertAlertsExistForBucketKeys(emptyList(), completedAlerts)
}

// TODO fix the tests
@Ignore("The test is failing because can't mock final class MockerRunnerService.")
fun `test getCancelAfterTimeInterval with default value`() {
// Mock the cancelAfterTimeInterval to be 10 minutes
monitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue(-1, TimeUnit.MINUTES)

val result = alertService.getCancelAfterTimeInterval()

// Expect the result to be the default
assertEquals(-1, result)
}

// TODO fix the tests
@Ignore("The test is failing because can't mock final class MockerRunnerService.")
fun `test getCancelAfterTimeInterval with custom value`() {
// Mock the cancelAfterTimeInterval to be 10 minutes
monitorRunnerService.monitorCtx.cancelAfterTimeInterval = TimeValue(10, TimeUnit.MINUTES)

val result = alertService.getCancelAfterTimeInterval()

// Expect the result to be the maximum between 10 minutes and ALERTS_SEARCH_TIMEOUT.minutes
assertEquals(10, result)
}

private fun createCurrentAlertsFromBucketKeys(
monitor: Monitor,
trigger: BucketLevelTrigger,
Expand Down

0 comments on commit a865512

Please sign in to comment.