From 09d93e31933aba57219bfbf1e0953993163603af Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Fri, 8 Mar 2024 18:26:47 -0800 Subject: [PATCH] add distributed locking to jobs in alerting Signed-off-by: Subhobrata Dey --- .../org/opensearch/alerting/AlertingPlugin.kt | 8 +- .../alerting/MonitorRunnerExecutionContext.kt | 4 +- .../alerting/MonitorRunnerService.kt | 81 +++-- .../alerting/service/DeleteMonitorService.kt | 10 + .../TransportDeleteWorkflowAction.kt | 9 + .../alerting/DocumentMonitorRunnerIT.kt | 81 +++++ .../alerting/resthandler/WorkflowRestApiIT.kt | 43 +++ .../alerting/core/lock/LockModel.kt | 117 +++++++ .../alerting/core/lock/LockService.kt | 311 ++++++++++++++++++ .../opensearch-alerting-config-lock.json | 18 + 10 files changed, 656 insertions(+), 26 deletions(-) create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt create mode 100644 core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt create mode 100644 core/src/main/resources/mappings/opensearch-alerting-config-lock.json diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 73f79713c..a591e31c2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -21,6 +21,7 @@ import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler import org.opensearch.alerting.core.schedule.JobScheduler import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings @@ -259,6 +260,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ): Collection { // Need to figure out how to use the OpenSearch DI classes rather than handwiring things here. val settings = environment.settings() + val lockService = LockService(client, clusterService) alertIndices = AlertIndices(settings, client, threadPool, clusterService) val alertService = AlertService(client, xContentRegistry, alertIndices) val triggerService = TriggerService(scriptService) @@ -277,6 +279,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) .registerJvmStats(JvmStats.jvmStats()) .registerWorkflowService(WorkflowService(client, xContentRegistry)) + .registerLockService(lockService) .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) @@ -301,7 +304,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R settings ) - DeleteMonitorService.initialize(client) + DeleteMonitorService.initialize(client, lockService) return listOf( sweeper, @@ -311,7 +314,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R docLevelMonitorQueries, destinationMigrationCoordinator, alertService, - triggerService + triggerService, + lockService ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index bef1155c9..459743f9d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings @@ -60,5 +61,6 @@ data class MonitorRunnerExecutionContext( AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, @Volatile var docLevelMonitorShardFetchSize: Int = AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, - @Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES + @Volatile var totalNodesFanOut: Int = AlertingSettings.DEFAULT_FAN_OUT_NODES, + @Volatile var lockService: LockService? = null ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 8668b4e53..5471b562c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -24,6 +24,8 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts import org.opensearch.alerting.core.JobRunner import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.core.lock.LockModel +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory @@ -242,6 +244,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerLockService(lockService: LockService): MonitorRunnerService { + monitorCtx.lockService = lockService + return this + } + // Updates destination settings when the reload API is called so that new keystore values are visible fun reloadDestinationSettings(settings: Settings) { monitorCtx.destinationSettings = loadDestinationSettings(settings) @@ -313,36 +320,64 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.execute( - ExecuteWorkflowAction.INSTANCE, - ExecuteWorkflowRequest( - false, - TimeValue(periodEnd.toEpochMilli()), - job.id, - job, - TimeValue(periodStart.toEpochMilli()) - ), - it + var lock: LockModel? = null + try { + lock = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, it) + } ?: return@launch + logger.debug("lock ${lock!!.lockId} acquired") + logger.debug( + "PERF_DEBUG: executing workflow ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id ) + monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute( + ExecuteWorkflowAction.INSTANCE, + ExecuteWorkflowRequest( + false, + TimeValue(periodEnd.toEpochMilli()), + job.id, + job, + TimeValue(periodStart.toEpochMilli()) + ), + it + ) + } + } finally { + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } + logger.debug("lock ${lock!!.lockId} released") } } } is Monitor -> { launch { - val executeMonitorRequest = ExecuteMonitorRequest( - false, - TimeValue(periodEnd.toEpochMilli()), - job.id, - job, - TimeValue(periodStart.toEpochMilli()) - ) - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.execute( - ExecuteMonitorAction.INSTANCE, - executeMonitorRequest, - it + var lock: LockModel? = null + try { + lock = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, it) + } ?: return@launch + logger.debug("lock ${lock!!.lockId} acquired") + logger.debug( + "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) + val executeMonitorRequest = ExecuteMonitorRequest( + false, + TimeValue(periodEnd.toEpochMilli()), + job.id, + job, + TimeValue(periodStart.toEpochMilli()) ) + monitorCtx.client!!.suspendUntil { + monitorCtx.client!!.execute( + ExecuteMonitorAction.INSTANCE, + executeMonitorRequest, + it + ) + } + } finally { + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } + logger.debug("lock ${lock!!.lockId} released") } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt index b26ae2473..2ba1fbd17 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorMetadataService +import org.opensearch.alerting.core.lock.LockModel +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH @@ -48,11 +50,14 @@ object DeleteMonitorService : private val log = LogManager.getLogger(this.javaClass) private lateinit var client: Client + private lateinit var lockService: LockService fun initialize( client: Client, + lockService: LockService ) { DeleteMonitorService.client = client + DeleteMonitorService.lockService = lockService } /** @@ -64,6 +69,7 @@ object DeleteMonitorService : val deleteResponse = deleteMonitor(monitor.id, refreshPolicy) deleteDocLevelMonitorQueriesAndIndices(monitor) deleteMetadata(monitor) + deleteLock(monitor) return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version) } @@ -147,6 +153,10 @@ object DeleteMonitorService : } } + private suspend fun deleteLock(monitor: Monitor) { + client.suspendUntil { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) } + } + /** * Checks if the monitor is part of the workflow * diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index 5a9938f56..f5c062c88 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.alerting.core.lock.LockModel +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.WorkflowMetadata import org.opensearch.alerting.opensearchapi.addFilter @@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor( val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, + val lockService: LockService ) : HandledTransportAction( AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest ), @@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor( } catch (t: Exception) { log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t) } + try { + // Delete the workflow lock + client.suspendUntil { lockService.deleteLock(LockModel.generateLockId(workflowId), it) } + } catch (t: Exception) { + log.error("Failed to delete workflow lock for $workflowId") + } actionListener.onResponse(deleteWorkflowResponse) } else { actionListener.onFailure( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index d00337ea7..899e656e7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -10,6 +10,7 @@ import org.apache.http.entity.StringEntity import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.client.Response import org.opensearch.client.ResponseException @@ -473,6 +474,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } + fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + Thread.sleep(240000) + + val inputMap = HashMap() + inputMap["searchString"] = monitor.name + + val responseMap = getAlerts(inputMap).asMap() + val alerts = (responseMap["alerts"] as ArrayList>) + alerts.forEach { + assertTrue(it["error_message"] == null) + } + } + + @AwaitsFix(bugUrl = "") + fun `test monitor run generate lock and monitor delete removes lock`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + OpenSearchTestCase.waitUntil({ + val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME) + return@waitUntil (response.restStatus().status == 200) + }, 240, TimeUnit.SECONDS) + + var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search") + var responseMap = entityAsMap(response) + var noOfLocks = ((responseMap["hits"] as Map)["hits"] as List).size + assertEquals(1, noOfLocks) + + deleteMonitor(monitor) + refreshIndex(LockService.LOCK_INDEX_NAME) + response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search") + responseMap = entityAsMap(response) + noOfLocks = ((responseMap["hits"] as Map)["hits"] as List).size + assertEquals(0, noOfLocks) + } + fun `test execute monitor with tag as trigger condition generates alerts and findings`() { val testIndex = createTestIndex() val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 9cd2c5e26..eba3533dd 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -37,6 +37,8 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.junit.annotations.TestLogging import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.Collections import java.util.Locale @@ -1185,4 +1187,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() { val findings = searchFindings(monitor.copy(id = monitorResponse.id)) assertEquals("Findings saved for test monitor", 1, findings.size) } + + fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = false, + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + createWorkflow( + randomWorkflow( + monitorIds = listOf(monitor.id), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + ) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + Thread.sleep(240000) + + val alerts = searchAlerts(monitor) + alerts.forEach { + assertTrue(it.errorMessage == null) + } + } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt new file mode 100644 index 000000000..706bd0987 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt @@ -0,0 +1,117 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.core.lock + +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.core.xcontent.XContentParserUtils +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +class LockModel( + val lockId: String, + val scheduledJobId: String, + val lockTime: Instant, + val released: Boolean, + val seqNo: Long, + val primaryTerm: Long +) : ToXContentObject { + + constructor( + copyLock: LockModel, + seqNo: Long, + primaryTerm: Long + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + copyLock.lockTime, + copyLock.released, + seqNo, + primaryTerm + ) + + constructor( + copyLock: LockModel, + released: Boolean + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + copyLock.lockTime, + released, + copyLock.seqNo, + copyLock.primaryTerm + ) + + constructor( + copyLock: LockModel, + updateLockTime: Instant, + released: Boolean + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + updateLockTime, + released, + copyLock.seqNo, + copyLock.primaryTerm + ) + + constructor( + scheduledJobId: String, + lockTime: Instant, + released: Boolean + ) : this ( + generateLockId(scheduledJobId), + scheduledJobId, + lockTime, + released, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(SCHEDULED_JOB_ID, scheduledJobId) + .field(LOCK_TIME, lockTime.epochSecond) + .field(RELEASED, released) + .endObject() + return builder + } + + companion object { + const val SCHEDULED_JOB_ID = "scheduled_job_id" + const val LOCK_TIME = "lock_time" + const val RELEASED = "released" + + fun generateLockId(scheduledJobId: String): String { + return "$scheduledJobId-lock" + } + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): LockModel { + lateinit var scheduledJobId: String + lateinit var lockTime: Instant + var released: Boolean = false + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEDULED_JOB_ID -> scheduledJobId = xcp.text() + LOCK_TIME -> lockTime = Instant.ofEpochSecond(xcp.longValue()) + RELEASED -> released = xcp.booleanValue() + } + } + return LockModel(generateLockId(scheduledJobId), scheduledJobId, lockTime, released, seqNo, primaryTerm) + } + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt new file mode 100644 index 000000000..35618e156 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt @@ -0,0 +1,311 @@ +package org.opensearch.alerting.core.lock + +import org.apache.logging.log4j.LogManager +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.DocWriteResponse +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.update.UpdateRequest +import org.opensearch.action.update.UpdateResponse +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.core.action.ActionListener +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.engine.DocumentMissingException +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +private val log = LogManager.getLogger(LockService::class.java) + +class LockService(private val client: Client, private val clusterService: ClusterService) { + private var testInstant: Instant? = null + + companion object { + const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock" + + @JvmStatic + fun lockMapping(): String? { + return LockService::class.java.classLoader.getResource("mappings/opensearch-alerting-config-lock.json") + ?.readText() + } + } + + fun lockIndexExist(): Boolean { + return clusterService.state().routingTable().hasIndex(LOCK_INDEX_NAME) + } + + fun acquireLock( + scheduledJob: ScheduledJob, + listener: ActionListener + ) { + val scheduledJobId = scheduledJob.id + acquireLockWithId(scheduledJobId, listener) + } + + fun acquireLockWithId( + scheduledJobId: String, + listener: ActionListener + ) { + val lockId = LockModel.generateLockId(scheduledJobId) + createLockIndex( + object : ActionListener { + override fun onResponse(created: Boolean) { + if (created) { + try { + findLock( + lockId, + object : ActionListener { + override fun onResponse(existingLock: LockModel?) { + if (existingLock != null) { + if (isLockReleased(existingLock)) { + log.debug("lock is released or expired: {}", existingLock) + val updateLock = LockModel(existingLock, getNow(), false) + updateLock(updateLock, listener) + } else { + log.debug("Lock is NOT released or expired. {}", existingLock) + listener.onResponse(null) + } + } else { + val tempLock = LockModel(scheduledJobId, getNow(), false) + log.debug("Lock does not exist. Creating new lock {}", tempLock) + createLock(tempLock, listener) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } catch (e: VersionConflictEngineException) { + log.debug("could not acquire lock {}", e.message) + listener.onResponse(null) + } + } else { + listener.onResponse(null) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } + + private fun createLock( + tempLock: LockModel, + listener: ActionListener + ) { + try { + val request = IndexRequest(LOCK_INDEX_NAME).id(tempLock.lockId) + .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + .create(true) + client.index( + request, + object : ActionListener { + override fun onResponse(response: IndexResponse) { + listener.onResponse(LockModel(tempLock, response.seqNo, response.primaryTerm)) + } + + override fun onFailure(e: Exception) { + if (e is VersionConflictEngineException) { + log.debug("Lock is already created. {}", e.message) + listener.onResponse(null) + return + } + listener.onFailure(e) + } + } + ) + } catch (ex: IOException) { + log.error("IOException occurred creating lock", ex) + listener.onFailure(ex) + } + } + + private fun updateLock( + updateLock: LockModel, + listener: ActionListener + ) { + try { + val updateRequest = UpdateRequest().index(LOCK_INDEX_NAME) + .id(updateLock.lockId) + .setIfSeqNo(updateLock.seqNo) + .setIfPrimaryTerm(updateLock.primaryTerm) + .doc(updateLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .fetchSource(true) + + client.update( + updateRequest, + object : ActionListener { + override fun onResponse(response: UpdateResponse) { + listener.onResponse(LockModel(updateLock, response.seqNo, response.primaryTerm)) + } + + override fun onFailure(e: Exception) { + if (e is VersionConflictEngineException) { + log.debug("could not acquire lock {}", e.message) + } + if (e is DocumentMissingException) { + log.debug( + "Document is deleted. This happens if the job is already removed and" + " this is the last run." + "{}", + e.message + ) + } + if (e is IOException) { + log.error("IOException occurred updating lock.", e) + } + listener.onResponse(null) + } + } + ) + } catch (ex: IOException) { + log.error("IOException occurred updating lock.", ex) + listener.onResponse(null) + } + } + + fun findLock( + lockId: String, + listener: ActionListener + ) { + val getRequest = GetRequest(LOCK_INDEX_NAME).id(lockId) + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + listener.onResponse(null) + } else { + try { + val parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.sourceAsString) + parser.nextToken() + listener.onResponse(LockModel.parse(parser, response.seqNo, response.primaryTerm)) + } catch (e: IOException) { + log.error("IOException occurred finding lock", e) + listener.onResponse(null) + } + } + } + + override fun onFailure(e: Exception) { + log.error("Exception occurred finding lock", e) + listener.onFailure(e) + } + } + ) + } + + fun release( + lock: LockModel?, + listener: ActionListener + ) { + if (lock == null) { + log.debug("Lock is null. Nothing to release.") + listener.onResponse(false) + } else { + log.debug("Releasing lock: {}", lock) + val lockToRelease = LockModel(lock, true) + updateLock( + lockToRelease, + object : ActionListener { + override fun onResponse(releasedLock: LockModel?) { + listener.onResponse(releasedLock != null) + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } + } + + fun deleteLock( + lockId: String, + listener: ActionListener + ) { + val deleteRequest = DeleteRequest(LOCK_INDEX_NAME).id(lockId) + client.delete( + deleteRequest, + object : ActionListener { + override fun onResponse(response: DeleteResponse) { + listener.onResponse( + response.result == DocWriteResponse.Result.DELETED || response.result == DocWriteResponse.Result.NOT_FOUND + ) + } + + override fun onFailure(e: Exception) { + if (e is IndexNotFoundException || e.cause is IndexNotFoundException) { + log.debug("Index is not found to delete lock. {}", e.message) + listener.onResponse(true) + } else { + listener.onFailure(e) + } + } + } + ) + } + + private fun createLockIndex(listener: ActionListener) { + if (lockIndexExist()) { + listener.onResponse(true) + } else { + val indexRequest = CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()) + .settings(Settings.builder().put("index.hidden", true).build()) + client.admin().indices().create( + indexRequest, + object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + listener.onResponse(response.isAcknowledged) + } + + override fun onFailure(ex: Exception) { + log.error("Failed to update config index schema", ex) + if (ex is ResourceAlreadyExistsException || ex.cause is ResourceAlreadyExistsException + ) { + listener.onResponse(true) + } else { + listener.onFailure(ex) + } + } + } + ) + } + } + + private fun isLockReleased(lock: LockModel): Boolean { + return lock.released + } + + private fun getNow(): Instant { + return if (testInstant != null) { + testInstant!! + } else { + Instant.now() + } + } + + fun setTime(testInstant: Instant) { + this.testInstant = testInstant + } +} diff --git a/core/src/main/resources/mappings/opensearch-alerting-config-lock.json b/core/src/main/resources/mappings/opensearch-alerting-config-lock.json new file mode 100644 index 000000000..401374a8f --- /dev/null +++ b/core/src/main/resources/mappings/opensearch-alerting-config-lock.json @@ -0,0 +1,18 @@ +{ + "dynamic": "strict", + "properties": { + "scheduled_job_id": { + "type": "keyword" + }, + "lock_time": { + "type": "date", + "format": "epoch_second" + }, + "lock_duration_seconds": { + "type": "long" + }, + "released": { + "type": "boolean" + } + } +} \ No newline at end of file