From c18b0f84ba4adf79ce10094938b54993318b8e56 Mon Sep 17 00:00:00 2001 From: bowenlan-amzn Date: Tue, 3 Oct 2023 16:53:56 -0700 Subject: [PATCH] backport #965 (#976) Signed-off-by: bowenlan-amzn --- .../snapshotmanagement/SMRunner.kt | 2 +- .../engine/SMStateMachine.kt | 8 +- .../IndexManagementRestTestCase.kt | 58 ++++ .../IndexStateManagementSecurityBehaviorIT.kt | 20 +- .../indexmanagement/SecurityRestTestCase.kt | 7 - ...IndexManagementBackwardsCompatibilityIT.kt | 7 +- .../resthandler/LRONConfigRestTestCase.kt | 17 -- .../IndexStateManagementRestTestCase.kt | 11 + .../action/ActionRetryIT.kt | 2 - .../action/ForceMergeActionIT.kt | 15 +- .../action/IndexStateManagementHistoryIT.kt | 278 ++---------------- .../action/RolloverActionIT.kt | 1 - .../action/RollupActionIT.kt | 52 ++-- .../coordinator/ManagedIndexCoordinatorIT.kt | 1 - .../resthandler/ISMTemplateRestAPIIT.kt | 1 - .../resthandler/RestExplainActionIT.kt | 8 - .../validation/ValidateCloseIT.kt | 66 ----- .../validation/ValidateDeleteIT.kt | 74 ----- .../validation/ValidateForceMergeIT.kt | 96 ------ .../validation/ValidateIndexPriorityIT.kt | 66 ----- .../validation/ValidateOpenIT.kt | 73 ----- .../validation/ValidateReadOnlyIT.kt | 71 ----- .../validation/ValidateReadWriteIT.kt | 80 ----- .../validation/ValidateReplicaCountIT.kt | 70 ----- .../validation/ValidateRolloverIT.kt | 218 -------------- .../validation/ValidateRolloverTests.kt | 72 ----- .../validation/ValidateSnapshotIT.kt | 73 ----- .../validation/ValidateTransitionIT.kt | 69 ----- .../rollup/RollupRestTestCase.kt | 57 ---- .../rollup/runner/RollupRunnerIT.kt | 4 - .../snapshotmanagement/SMRunnerIT.kt | 4 +- .../SnapshotManagementRestTestCase.kt | 45 +++ worksheets/sm/create.http | 18 +- 33 files changed, 195 insertions(+), 1449 deletions(-) delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateCloseIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateIndexPriorityIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateSnapshotIT.kt delete mode 100644 src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateTransitionIT.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt index 10636891d..b15491c1e 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunner.kt @@ -70,7 +70,7 @@ object SMRunner : ) override fun runJob(job: ScheduledJobParameter, context: JobExecutionContext) { - log.debug("Snapshot management running job: $job") + log.debug("Snapshot management running job: {}", job) if (job !is SMPolicy) { throw IllegalArgumentException("Received invalid job type [${job.javaClass.simpleName}] with id [${context.jobId}].") diff --git a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt index 51f4d35a2..058eeb7d5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/snapshotmanagement/engine/SMStateMachine.kt @@ -39,7 +39,7 @@ class SMStateMachine( val indicesManager: IndexManagementIndices, ) { - val log: Logger = LogManager.getLogger("$javaClass [${job.policyName}]") + val log: Logger = LogManager.getLogger(javaClass) lateinit var currentState: SMState fun currentState(currentState: SMState): SMStateMachine { @@ -62,7 +62,7 @@ class SMStateMachine( val prevState = currentState for (nextState in nextStates) { currentState = nextState - log.debug("Start executing $currentState.") + log.debug("Start executing {}.", currentState) log.debug( "User and roles string from thread context: ${threadPool.threadContext.getTransient( ConfigConstants.OPENSEARCH_SECURITY_USER_INFO_THREAD_CONTEXT @@ -99,7 +99,7 @@ class SMStateMachine( break } is SMResult.Stay -> { - log.debug("State [$currentState] has not finished.") + log.debug("State [{}] has not finished.", currentState) updateMetadata( result.metadataToSave .setCurrentState(prevState) @@ -200,7 +200,7 @@ class SMStateMachine( suspend fun updateMetadata(md: SMMetadata) { indicesManager.checkAndUpdateIMConfigIndex(log) try { - log.debug("Update metadata: $md") + log.debug("Update metadata: {}", md) if (md == metadata) { log.debug("Metadata not change, so don't need to update") return diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt index 34647a0b5..1428cfea4 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexManagementRestTestCase.kt @@ -27,9 +27,14 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_HIDDEN import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.MediaType +import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import java.io.IOException import java.nio.file.Files import java.util.Date +import java.time.Duration +import java.time.Instant +import java.util.* import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory @@ -65,6 +70,24 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { } } + @Before + fun setDebugLogLevel() { + client().makeRequest( + "PUT", "_cluster/settings", + StringEntity( + """ + { + "transient": { + "logger.org.opensearch.indexmanagement":"DEBUG", + "logger.org.opensearch.jobscheduler":"DEBUG" + } + } + """.trimIndent(), + ContentType.APPLICATION_JSON + ) + ) + } + protected val isDebuggingTest = DisableOnDebug(null).isDebugging protected val isDebuggingRemoteCluster = System.getProperty("cluster.debug", "false")!!.toBoolean() @@ -172,6 +195,41 @@ abstract class IndexManagementRestTestCase : ODFERestTestCase() { } } + protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) { + // Before updating start time of a job always make sure there are no unassigned shards that could cause the config + // index to move to a new node and negate this forced start + if (isMultiNode) { + waitFor { + try { + client().makeRequest("GET", "_cluster/allocation/explain") + fail("Expected 400 Bad Request when there are no unassigned shards to explain") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + } + val intervalSchedule = (update.jobSchedule as IntervalSchedule) + val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() + val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis) + val waitForActiveShards = if (isMultiNode) "all" else "1" + // TODO flaky: Add this log to confirm this update is missed by job scheduler + // This miss is because shard remove, job scheduler deschedule on the original node and reschedule on another node + // However the shard comes back, and job scheduler deschedule on the another node and reschedule on the original node + // During this period, this update got missed + // Since from the log, this happens very fast (within 0.1~0.2s), the above cluster explain may not have the granularity to catch this. + logger.info("Update rollup start time to $startTimeMillis") + val response = client().makeRequest( + "POST", "${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX}/_update/${update.id}?wait_for_active_shards=$waitForActiveShards&refresh=true", + StringEntity( + "{\"doc\":{\"rollup\":{\"schedule\":{\"interval\":{\"start_time\":" + + "\"$startTimeMillis\"}}}}}", + ContentType.APPLICATION_JSON + ) + ) + + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + } + override fun preserveIndicesUponCompletion(): Boolean = true companion object { val isMultiNode = System.getProperty("cluster.number_of_nodes", "1").toInt() > 1 diff --git a/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt index 103f95c0f..00bbd543f 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/IndexStateManagementSecurityBehaviorIT.kt @@ -273,8 +273,9 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() { ) } - private fun assertIndexRolledUp(indexName: String, policyId: String, rollup: ISMRollup) { - val rollupId = rollup.toRollup(indexName).id + private fun assertIndexRolledUp(indexName: String, policyId: String, ismRollup: ISMRollup) { + val rollup = ismRollup.toRollup(indexName) + val rollupId = rollup.id val managedIndexConfig = getExistingManagedIndexConfig(indexName) // Change the start time so that the policy will be initialized. @@ -290,9 +291,14 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() { ) } - Thread.sleep(60000) + updateRollupStartTime(rollup) + waitFor(timeout = Instant.ofEpochSecond(60)) { + val rollupJob = getRollup(rollupId = rollupId) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } - // Change the start time so that the rollup action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals( @@ -300,11 +306,5 @@ class IndexStateManagementSecurityBehaviorIT : SecurityRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - val rollupJob = getRollup(rollupId = rollupId) - waitFor { - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) - } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt index 5785a8f57..cf5709ccd 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/SecurityRestTestCase.kt @@ -54,9 +54,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { client: RestClient ) = super.createRollup(rollup, rollupId, refresh, client) - fun updateRollupStartTimeExt(update: Rollup, desiredStartTimeMillis: Long? = null) = - super.updateRollupStartTime(update, desiredStartTimeMillis) - fun getRollupMetadataExt( metadataId: String, refresh: Boolean = true, @@ -165,10 +162,6 @@ abstract class SecurityRestTestCase : IndexManagementRestTestCase() { return executeRequest(request, expectedStatus, client) } - protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) { - RollupRestTestCaseSecurityExtension.updateRollupStartTimeExt(update, desiredStartTimeMillis) - } - protected fun getRollupMetadata( metadataId: String, refresh: Boolean = true, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt index d38401711..733d52674 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/bwc/IndexManagementBackwardsCompatibilityIT.kt @@ -19,6 +19,7 @@ import org.opensearch.indexmanagement.makeRequest import org.opensearch.indexmanagement.opensearchapi.string import org.opensearch.indexmanagement.util.NO_ID import org.opensearch.core.rest.RestStatus +import org.opensearch.indexmanagement.waitFor import org.opensearch.search.builder.SearchSourceBuilder class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() { @@ -60,7 +61,9 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() { createBasicPolicy() verifyPolicyExists(LEGACY_POLICY_BASE_URI) - verifyPolicyOnIndex(LEGACY_ISM_BASE_URI) + waitFor { + verifyPolicyOnIndex(LEGACY_ISM_BASE_URI) + } } ClusterType.MIXED -> { assertTrue(pluginNames.contains("opensearch-index-management")) @@ -137,7 +140,6 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() { val createdVersion = responseBody["_version"] as Int assertNotEquals("Create policy response is missing id", NO_ID, createdId) assertTrue("Create policy response has incorrect version", createdVersion > 0) - Thread.sleep(10000) } @Throws(Exception::class) @@ -164,6 +166,7 @@ class IndexManagementBackwardsCompatibilityIT : IndexManagementRestTestCase() { assertEquals("Explain Index failed", RestStatus.OK, getResponse.restStatus()) val responseBody = getResponse.asMap() + logger.info("Response body: $responseBody") assertTrue("Test index does not exist", responseBody.containsKey(INDEX_NAME)) val responsePolicy = responseBody[INDEX_NAME] as Map val responsePolicyId = responsePolicy["policy_id"] diff --git a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt index dad6a178b..7b5012541 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/controlcenter/notification/resthandler/LRONConfigRestTestCase.kt @@ -24,7 +24,6 @@ import org.opensearch.core.rest.RestStatus abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { @Before fun prepareForIT() { - setDebugLogLevel() /* init cluster node ids in integ test */ initNodeIdsInRestIT(client()) } @@ -49,22 +48,6 @@ abstract class LRONConfigRestTestCase : IndexManagementRestTestCase() { return client().makeRequest("POST", IndexManagementPlugin.LRON_BASE_URI, emptyMap(), lronConfig.toHttpEntity()) } - private fun setDebugLogLevel() { - client().makeRequest( - "PUT", "_cluster/settings", - StringEntity( - """ - { - "transient": { - "logger.org.opensearch.indexmanagement.controlcenter.notification":"DEBUG" - } - } - """.trimIndent(), - ContentType.APPLICATION_JSON - ) - ) - } - protected fun LRONConfig.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), ContentType.APPLICATION_JSON) companion object { diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index f7b31f8ca..2cf8d1ee7 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -533,6 +533,17 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() return myIndex["state"] as String } + @Suppress("UNCHECKED_CAST") + protected fun getIndexNamesOfPattern(pattern: String): Set { + val request = Request("GET", "/_cluster/state") + val response = client().performRequest(request) + + val responseMap = response.asMap() + val metadata = responseMap["metadata"] as Map + val indexMetaData = metadata["indices"] as Map + return indexMetaData.filter { it.key.startsWith(pattern) }.keys + } + @Suppress("UNCHECKED_CAST") protected fun getIndexBlocksWriteSetting(indexName: String): String { val indexSettings = getIndexSettings(indexName) as Map>> diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt index 2618dc549..f6970b754 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ActionRetryIT.kt @@ -24,7 +24,6 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { * We are forcing RollOver to fail in this Integ test. */ fun `test failed action`() { - disableValidationService() val testPolicy = """ {"policy":{"description":"Default policy","default_state":"Ingest","states":[ {"name":"Ingest","actions":[{"retry":{"count":2,"backoff":"constant","delay":"1s"},"rollover":{"min_doc_count":100}}],"transitions":[{"state_name":"Search"}]}, @@ -96,7 +95,6 @@ class ActionRetryIT : IndexStateManagementRestTestCase() { } fun `test exponential backoff`() { - disableValidationService() val testPolicy = """ {"policy":{"description":"Default policy","default_state":"Ingest","states":[ {"name":"Ingest","actions":[{"retry":{"count":2,"backoff":"exponential","delay":"1m"},"rollover":{"min_doc_count":100}}],"transitions":[{"state_name":"Search"}]}, diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt index fca73466a..67567be71 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/ForceMergeActionIT.kt @@ -29,7 +29,6 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Create a Policy with one State that only preforms a force_merge Action val forceMergeActionConfig = ForceMergeAction(maxNumSegments = 1, index = 0) val states = listOf(State("ForceMergeState", listOf(forceMergeActionConfig), listOf())) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -45,7 +44,6 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } val managedIndexConfig = getExistingManagedIndexConfig(indexName) @@ -53,17 +51,14 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Will change the startTime each execution so that it triggers in 2 seconds // First execution: Policy is initialized updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // Second execution: Index is set to read-only for force_merge updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - // verify we set maxNumSegments in action properties when kicking off force merge waitFor { assertEquals( @@ -75,10 +70,9 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } // verify we reset actionproperties at end of forcemerge - waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } + waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.maxNumSegments) } // index should still be readonly after force merge finishes waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } @@ -106,7 +100,6 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created insertSampleData(indexName, 3, 1000) - waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } // Set index to read-only @@ -117,26 +110,22 @@ class ForceMergeActionIT : IndexStateManagementRestTestCase() { // Will change the startTime each execution so that it triggers in 2 seconds // First execution: Policy is initialized updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } // Second execution: Index was already read-only and should remain so for force_merge updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(AttemptSetReadOnlyStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } // Third execution: Force merge operation is kicked off updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(AttemptCallForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(WaitForForceMergeStep.getSuccessMessage(indexName), getExplainManagedIndexMetaData(indexName).info?.get("message")) } assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) + waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.maxNumSegments) } assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt index ebb9c6be4..a592d2c18 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/IndexStateManagementHistoryIT.kt @@ -34,7 +34,6 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { val states = listOf( State("ReadOnlyState", listOf(actionConfig), listOf()) ) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -51,23 +50,18 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // Change the start time so the job will trigger in 2 seconds. + // First run updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. + // Second run updateManagedIndexConfigStartTime(managedIndexConfig) - val historySearchResponse: SearchResponse = waitFor { val historySearchResponse = getHistorySearchResponse(indexName) assertEquals(2, historySearchResponse.hits.totalHits!!.value) historySearchResponse } - val actualHistory = getLatestHistory(historySearchResponse) - val expectedHistory = ManagedIndexMetaData( indexName, getUuid(indexName), @@ -84,20 +78,18 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyRetryInfo = PolicyRetryInfoMetaData(false, 0), info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) - assertEquals(expectedHistory, actualHistory) waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } - fun `test short retention period and history enabled`() { + fun `test history write index cannot be deleted if enabled, can be deleted if disabled`() { val indexName = "${testIndexName}_index_2" val policyID = "${testIndexName}_testPolicyName_2" val actionConfig = ReadOnlyAction(0) val states = listOf( State("ReadOnlyState", listOf(actionConfig), listOf()) ) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -111,97 +103,26 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { createPolicy(policy, policyID) createIndex(indexName, policyID) - restAdminSettings() updateClusterSetting(ManagedIndexSettings.HISTORY_ENABLED.key, "true") - updateClusterSetting(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.key, "5s") - updateClusterSetting(ManagedIndexSettings.HISTORY_RETENTION_PERIOD.key, "5s") + updateClusterSetting(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.key, "2s") + updateClusterSetting(ManagedIndexSettings.HISTORY_RETENTION_PERIOD.key, "1s") val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. + // First run updateManagedIndexConfigStartTime(managedIndexConfig) - - val historySearchResponse: SearchResponse = waitFor { - val historySearchResponse = getHistorySearchResponse(indexName) - assertEquals(2, historySearchResponse.hits.totalHits!!.value) - historySearchResponse + waitFor { + assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - val actualHistory = getLatestHistory(historySearchResponse) - - val expectedHistory = ManagedIndexMetaData( - indexName, - getUuid(indexName), - policyID, - actualHistory.policySeqNo, - policyPrimaryTerm = actualHistory.policyPrimaryTerm, - policyCompleted = null, - rolledOver = null, - indexCreationDate = actualHistory.indexCreationDate, - transitionTo = null, - stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), - actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory.actionMetaData!!.startTime, 0, false, 0, 0, null), - stepMetaData = StepMetaData("set_read_only", actualHistory.stepMetaData!!.startTime, Step.StepStatus.COMPLETED), - policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) - ) - - assertEquals(expectedHistory, actualHistory) - - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } - } - - fun `test small doc count rolledover index`() { - val indexName = "${testIndexName}_index_3" - val policyID = "${testIndexName}_testPolicyNam_3" - val actionConfig = ReadOnlyAction(0) - val states = listOf( - State("ReadOnlyState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - restAdminSettings() - updateClusterSetting(ManagedIndexSettings.HISTORY_ENABLED.key, "true") - updateClusterSetting(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.key, "5s") - updateClusterSetting(ManagedIndexSettings.HISTORY_MAX_DOCS.key, "1") - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. + // Second run updateManagedIndexConfigStartTime(managedIndexConfig) - val historySearchResponse: SearchResponse = waitFor { val historySearchResponse = getHistorySearchResponse(indexName) assertEquals(2, historySearchResponse.hits.totalHits!!.value) historySearchResponse } - val actualHistory = getLatestHistory(historySearchResponse) - val expectedHistory = ManagedIndexMetaData( indexName, getUuid(indexName), @@ -218,180 +139,20 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { policyRetryInfo = PolicyRetryInfoMetaData(false, 0), info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) ) - assertEquals(expectedHistory, actualHistory) - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } - } - - fun `test short retention period and rolledover index`() { - val indexName = "${testIndexName}_index_4" - val policyID = "${testIndexName}_testPolicyNam_4" - val actionConfig = ReadOnlyAction(0) - val states = listOf( - State("ReadOnlyState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - restAdminSettings() - resetHistorySetting() - - updateClusterSetting(ManagedIndexSettings.HISTORY_ENABLED.key, "true") - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - val historySearchResponse: SearchResponse = waitFor { - val historySearchResponse = getHistorySearchResponse(indexName) - assertEquals(1, historySearchResponse.hits.totalHits!!.value) - historySearchResponse - } - - val actualHistory = getLatestHistory(historySearchResponse) - - val expectedHistory = ManagedIndexMetaData( - indexName, - getUuid(indexName), - policyID, - actualHistory.policySeqNo, - policyPrimaryTerm = actualHistory.policyPrimaryTerm, - policyCompleted = null, - rolledOver = null, - indexCreationDate = actualHistory.indexCreationDate, - transitionTo = null, - stateMetaData = StateMetaData("ReadOnlyState", actualHistory.stateMetaData!!.startTime), - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Successfully initialized policy: $policyID") - ) - - assertEquals(expectedHistory, actualHistory) - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - val historySearchResponse1: SearchResponse = waitFor { - val historySearchResponse1 = getHistorySearchResponse(indexName) - assertEquals(2, historySearchResponse1.hits.totalHits!!.value) - historySearchResponse1 - } - - val actualHistory1 = getLatestHistory(historySearchResponse1) - - val expectedHistory1 = ManagedIndexMetaData( - indexName, - getUuid(indexName), - policyID, - actualHistory1.policySeqNo, - policyPrimaryTerm = actualHistory1.policyPrimaryTerm, - policyCompleted = null, - rolledOver = null, - indexCreationDate = actualHistory1.indexCreationDate, - transitionTo = null, - stateMetaData = StateMetaData(states[0].name, actualHistory1.stateMetaData!!.startTime), - actionMetaData = ActionMetaData(ReadOnlyAction.name, actualHistory1.actionMetaData!!.startTime, 0, false, 0, 0, null), - stepMetaData = StepMetaData("set_read_only", actualHistory1.stepMetaData!!.startTime, Step.StepStatus.COMPLETED), - policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to SetReadOnlyStep.getSuccessMessage(indexName)) - ) - - assertEquals(expectedHistory1, actualHistory1) - - updateClusterSetting(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.key, "2s") + var historyIndexName = getIndexNamesOfPattern(".opendistro-ism-managed-index-history") + assert(historyIndexName.first().endsWith("-1")) + // e.g.: .opendistro-ism-managed-index-history-2023.10.01-1 updateClusterSetting(ManagedIndexSettings.HISTORY_MAX_DOCS.key, "1") - updateClusterSetting(ManagedIndexSettings.HISTORY_RETENTION_PERIOD.key, "1s") - - // After updating settings, ensure all the histories are deleted. waitFor { - val historySearchResponse3 = getHistorySearchResponse(indexName) - assertEquals(0, historySearchResponse3.hits.totalHits!!.value) - } - - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } - } - - fun `test short retention period and history disabled`() { - val indexName = "${testIndexName}_index_5" - val policyID = "${testIndexName}_testPolicyName_5" - val actionConfig = ReadOnlyAction(0) - val states = listOf( - State("ReadOnlyState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - restAdminSettings() - resetHistorySetting() - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - val historySearchResponse: SearchResponse = waitFor { - val historySearchResponse = getHistorySearchResponse(indexName) - assertEquals(1, historySearchResponse.hits.totalHits!!.value) - historySearchResponse + historyIndexName = getIndexNamesOfPattern(".opendistro-ism-managed-index-history") + assert(historyIndexName.first().endsWith("2")) + // e.g.: .opendistro-ism-managed-index-history-2023.10.01-000002 } - val actualHistory = getLatestHistory(historySearchResponse) - - val expectedHistory = ManagedIndexMetaData( - indexName, - getUuid(indexName), - policyID, - actualHistory.policySeqNo, - policyPrimaryTerm = actualHistory.policyPrimaryTerm, - policyCompleted = null, - rolledOver = null, - indexCreationDate = actualHistory.indexCreationDate, - transitionTo = null, - stateMetaData = StateMetaData(name = states[0].name, startTime = actualHistory.stateMetaData!!.startTime), - actionMetaData = null, - stepMetaData = null, - policyRetryInfo = PolicyRetryInfoMetaData(false, 0), - info = mapOf("message" to "Successfully initialized policy: $policyID") - ) - - assertEquals(expectedHistory, actualHistory) - updateClusterSetting(ManagedIndexSettings.HISTORY_ENABLED.key, "false") - updateClusterSetting(ManagedIndexSettings.HISTORY_RETENTION_PERIOD.key, "1s") - updateClusterSetting(ManagedIndexSettings.HISTORY_ROLLOVER_CHECK_PERIOD.key, "2s") - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertFalse("History index does exist.", aliasExists(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS)) } - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } } fun `test history shard settings`() { @@ -400,7 +161,6 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { val policyID = "${testIndexName}_shard_settings_1" val actionConfig = ReadOnlyAction(0) val states = listOf(State("ReadOnlyState", listOf(actionConfig), listOf())) - val policy = Policy( id = policyID, description = "$testIndexName description", @@ -413,21 +173,17 @@ class IndexStateManagementHistoryIT : IndexStateManagementRestTestCase() { createPolicy(policy, policyID) createIndex(indexName, policyID) + resetHistorySetting() updateClusterSetting(ManagedIndexSettings.HISTORY_NUMBER_OF_SHARDS.key, "2") updateClusterSetting(ManagedIndexSettings.HISTORY_NUMBER_OF_REPLICAS.key, "3") val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. // Change the start time so the job will trigger in 2 seconds. updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { + assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) assertIndexExists(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS) val indexSettings = getIndexSettings(IndexManagementIndices.HISTORY_WRITE_INDEX_ALIAS) val historyIndexName = indexSettings.keys.firstOrNull { it.startsWith(IndexManagementIndices.HISTORY_INDEX_BASE) } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt index 797b5acc9..a2d8b8553 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RolloverActionIT.kt @@ -380,7 +380,6 @@ class RolloverActionIT : IndexStateManagementRestTestCase() { } fun `test rollover pre check`() { - disableValidationService() // index-1 alias x // index-2 alias x is_write_index // manage index-1, expect it fail to rollover diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt index fa9402a3b..422f72877 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt @@ -28,7 +28,6 @@ import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount import org.opensearch.indexmanagement.rollup.toJsonString import org.opensearch.indexmanagement.waitFor -import java.lang.Thread.sleep import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Locale @@ -150,9 +149,6 @@ class RollupActionIT : IndexStateManagementRestTestCase() { fun `test data stream rollup action with scripted targetIndex`() { val dataStreamName = "${testIndexName}_data_stream" val policyID = "${testIndexName}_rollup_policy" - - sleep(10000) - val rollup = ISMRollup( description = "data stream rollup", targetIndex = "rollup_{{ctx.source_index}}", @@ -218,7 +214,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { fun `test rollup action failure`() { val indexName = "${testIndexName}_index_failure" val policyID = "${testIndexName}_policy_failure" - val rollup = ISMRollup( + val ismRollup = ISMRollup( description = "basic search test", targetIndex = "target_rollup_search", pageSize = 100, @@ -237,8 +233,9 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ) ) ) - val rollupId = rollup.toRollup(indexName).id - val actionConfig = RollupAction(rollup, 0) + val rollup = ismRollup.toRollup(indexName) + val rollupId = rollup.id + val actionConfig = RollupAction(ismRollup, 0) val states = listOf( State("rollup", listOf(actionConfig), listOf()) ) @@ -263,7 +260,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Change the start time so we attempt to create rollup step will execute + // Change the start time, so we attempt to create rollup step will execute updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals( @@ -272,8 +269,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ) } - Thread.sleep(60000) - + updateRollupStartTime(rollup) // Change the start time so wait for rollup step will execute updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { @@ -332,7 +328,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Change the start time so we attempt to create rollup step will execute + // Change the start time, so we attempt to create rollup step will execute updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals( @@ -345,7 +341,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { fun `test rollup action failure and retry failed step`() { val indexName = "${testIndexName}_index_retry" val policyID = "${testIndexName}_policy_retry" - val rollup = ISMRollup( + val ismRollup = ISMRollup( description = "basic search test", targetIndex = "target_rollup_search", pageSize = 100, @@ -364,10 +360,11 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ) ) ) - val rollupId = rollup.toRollup(indexName).id + val rollup = ismRollup.toRollup(indexName) + val rollupId = rollup.id val policyString = "{\"policy\":{\"description\":\"$testIndexName description\",\"default_state\":\"rollup\",\"states\":[{\"name\":\"rollup\"," + "\"actions\":[{\"retry\":{\"count\":2,\"backoff\":\"constant\",\"delay\":\"10ms\"},\"rollup\":{\"ism_rollup\":" + - "${rollup.toJsonString()}}}],\"transitions\":[]}]}}" + "${ismRollup.toJsonString()}}}],\"transitions\":[]}]}}" val sourceIndexMappingString = "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " + "\"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " + @@ -381,7 +378,7 @@ class RollupActionIT : IndexStateManagementRestTestCase() { updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - // Change the start time so we attempt to create rollup step will execute + // Change the start time, so we attempt to create rollup step will execute updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals( @@ -399,8 +396,8 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ) } - // Wait for few seconds and change start time so wait for rollup step will execute again - job will be failed - Thread.sleep(60000) + // Wait for rollup step job failed + updateRollupStartTime(rollup) updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals( @@ -410,8 +407,9 @@ class RollupActionIT : IndexStateManagementRestTestCase() { } } - private fun assertIndexRolledUp(indexName: String, policyId: String, rollup: ISMRollup) { - val rollupId = rollup.toRollup(indexName).id + private fun assertIndexRolledUp(indexName: String, policyId: String, ismRollup: ISMRollup) { + val rollup = ismRollup.toRollup(indexName) + val rollupId = rollup.id val managedIndexConfig = getExistingManagedIndexConfig(indexName) // Change the start time so that the policy will be initialized. @@ -427,9 +425,14 @@ class RollupActionIT : IndexStateManagementRestTestCase() { ) } - Thread.sleep(60000) + updateRollupStartTime(rollup) + waitFor(timeout = Instant.ofEpochSecond(60)) { + val rollupJob = getRollup(rollupId = rollupId) + assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) + val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) + assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) + } - // Change the start time so that the rollup action will be attempted. updateManagedIndexConfigStartTime(managedIndexConfig) waitFor { assertEquals( @@ -437,12 +440,5 @@ class RollupActionIT : IndexStateManagementRestTestCase() { getExplainManagedIndexMetaData(indexName).info?.get("message") ) } - - val rollupJob = getRollup(rollupId = rollupId) - waitFor { - assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) - val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) - assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status) - } } } diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index 4f7f4e7da..f42b55a56 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -63,7 +63,6 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { } fun `test managed index metadata is cleaned up after removing policy`() { - disableValidationService() val policy = createRandomPolicy() val (index) = createIndex(policyID = policy.id) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt index 63af39f5b..4fad90ebb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt @@ -79,7 +79,6 @@ class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { } fun `test ism template managing index`() { - disableValidationService() val indexName1 = "log-000001" val indexName2 = "log-000002" val indexName3 = "log-000003" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt index 7138a903a..76dc83eb5 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/resthandler/RestExplainActionIT.kt @@ -28,7 +28,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) fun `test single index`() { - disableValidationService() val indexName = "${testIndexName}_movies" createIndex(indexName, null) val expected = mapOf( @@ -52,7 +51,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test two indices, one managed one not managed`() { - disableValidationService() // explicitly asks for un-managed index, will return policy_id as null val indexName1 = "${testIndexName}_managed" val indexName2 = "${testIndexName}_not_managed" @@ -82,7 +80,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test two indices, one managed one not managed explain all`() { - disableValidationService() // explain all returns only managed indices val indexName1 = "${testIndexName}_managed" val indexName2 = "${testIndexName}_not_managed" @@ -107,7 +104,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test index pattern`() { - disableValidationService() val indexName1 = "${testIndexName}_pattern" val indexName2 = "${indexName1}_2" val indexName3 = "${indexName1}_3" @@ -145,7 +141,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test search query string`() { - disableValidationService() val indexName1 = "$testIndexName-search-query-string" val indexName2 = "$indexName1-testing-2" val indexName3 = "$indexName1-testing-3" @@ -254,7 +249,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test attached policy`() { - disableValidationService() val indexName = "${testIndexName}_watermelon" val policy = createRandomPolicy() createIndex(indexName, policy.id) @@ -294,7 +288,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test failed policy`() { - disableValidationService() val indexName = "${testIndexName}_melon" val policy = createRandomPolicy() createIndex(indexName, policy.id) @@ -337,7 +330,6 @@ class RestExplainActionIT : IndexStateManagementRestTestCase() { } fun `test show_applied_policy query parameter`() { - disableValidationService() val indexName = "${testIndexName}_show_applied_policy" val policy = createRandomPolicy() createIndex(indexName, policy.id) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateCloseIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateCloseIT.kt deleted file mode 100644 index 091d948de..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateCloseIT.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.CloseAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateCloseIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test basic close action validation`() { - enableValidationService() - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = CloseAction(0) - val states = listOf( - State("CloseState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - assertEquals("open", getIndexState(indexName)) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("close", getIndexState(indexName)) } - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index close action validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt deleted file mode 100644 index 16e063270..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateDeleteIT.kt +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.DeleteAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateDeleteIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test delete index is write index`() { - enableValidationService() - val index1 = "index-1" - val alias1 = "x" - val policyID = "${testIndexName}_precheck" - val actionConfig = DeleteAction(0) - actionConfig.configRetry = ActionRetry(0) - val states = listOf(State(name = "DeleteAction", actions = listOf(actionConfig), transitions = listOf())) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(index1, policyID) - changeAlias(index1, alias1, "add", true) - updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1) - - val managedIndexConfig = getExistingManagedIndexConfig(index1) - - // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) } - // waitFor { assertIndexExists(index1) } - - // Need to speed up to second execution where it will trigger the first execution of the action - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - val data = getExplainValidationResult(index1) - assertEquals( - "Index delete action validation status is RE_VALIDATING.", - Validate.ValidationStatus.RE_VALIDATING, - data.validationStatus - ) - } - waitFor { - val data = getExplainValidationResult(index1) - assertEquals( - "Index delete action validation message is index is write index.", - ValidateDelete.getFailedIsWriteIndexMessage(index1), - data.validationMessage - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt deleted file mode 100644 index b44ad9ac3..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateForceMergeIT.kt +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateForceMergeIT : IndexStateManagementRestTestCase() { - - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test basic workflow`() { - enableValidationService() - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - - // Create a Policy with one State that only preforms a force_merge Action - val forceMergeActionConfig = org.opensearch.indexmanagement.indexstatemanagement.action.ForceMergeAction(maxNumSegments = 1, index = 0) - val states = listOf(State("ForceMergeState", listOf(forceMergeActionConfig), listOf())) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - // Add sample data to increase segment count, passing in a delay to ensure multiple segments get created - insertSampleData(indexName, 3, 1000) - - waitFor { assertTrue("Segment count for [$indexName] was less than expected", validateSegmentCount(indexName, min = 2)) } - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Will change the startTime each execution so that it triggers in 2 seconds - // First execution: Policy is initialized - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Second execution: Index is set to read-only for force_merge - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } - - // Third execution: Force merge operation is kicked off - updateManagedIndexConfigStartTime(managedIndexConfig) - - // verify we set maxNumSegments in action properties when kicking off force merge - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index force_merge action validation status is RE_VALIDATING.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - assertEquals( - "Index force_merge action validation status is RE_VALIDATING.", - ValidateForceMerge.getValidationPassedMessage(indexName), - data.validationMessage - ) - } - waitFor { - assertEquals( - "maxNumSegments not set in ActionProperties", - forceMergeActionConfig.maxNumSegments, - getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties?.maxNumSegments - ) - } - - // Fourth execution: Waits for force merge to complete, which will happen in this execution since index is small - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertTrue("Segment count for [$indexName] after force merge is incorrect", validateSegmentCount(indexName, min = 1, max = 1)) } - // verify we reset actionproperties at end of forcemerge - waitFor { assertNull("maxNumSegments was not reset", getExplainManagedIndexMetaData(indexName).actionMetaData?.actionProperties) } - // index should still be readonly after force merge finishes - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateIndexPriorityIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateIndexPriorityIT.kt deleted file mode 100644 index c3a8ce71b..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateIndexPriorityIT.kt +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.IndexPriorityAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateIndexPriorityIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test basic index priority`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = IndexPriorityAction(50, 0) - val states = listOf(State(name = "SetPriorityState", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - // Change the runJob start time so the job will trigger in 2 seconds - updateManagedIndexConfigStartTime(managedIndexConfig) - - // ism policy initialized - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // change the runJob start time to change index priority - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("Index did not set index_priority to ${actionConfig.indexPriority}", actionConfig.indexPriority, getIndexPrioritySetting(indexName)) } - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index Priority action validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt deleted file mode 100644 index 0bb00610a..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateOpenIT.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.OpenAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateOpenIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test validate open basic`() { - enableValidationService() - val indexName = "index_1" - val policyID = "${testIndexName}_precheck" - val actionConfig = OpenAction(0) - val states = listOf( - State("OpenState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(indexName, policyID) - closeIndex(indexName) - - assertEquals("close", getIndexState(indexName)) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("open", getIndexState(indexName)) } - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index open action validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt deleted file mode 100644 index 0bbcf9d81..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadOnlyIT.kt +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.ReadOnlyAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateReadOnlyIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test read_only validation`() { - enableValidationService() - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = ReadOnlyAction(0) - val states = listOf( - State("ReadOnlyState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("true", getIndexBlocksWriteSetting(indexName)) } - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index read cation validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt deleted file mode 100644 index 1b530fb6b..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReadWriteIT.kt +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.common.settings.Settings -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.ReadWriteAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateReadWriteIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test read_write validation`() { - enableValidationService() - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = ReadWriteAction(0) - val states = listOf( - State("ReadWriteState", listOf(actionConfig), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, null) - // Set index to read-only - updateIndexSettings( - indexName, - Settings.builder().put("index.blocks.write", true) - ) - - assertEquals("true", getIndexBlocksWriteSetting(indexName)) - addPolicyToIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles. - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("false", getIndexBlocksWriteSetting(indexName)) } - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index read_write action validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt deleted file mode 100644 index 60f941588..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateReplicaCountIT.kt +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.ReplicaCountAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateReplicaCountIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test replica count validation`() { - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = ReplicaCountAction(10, 0) - val states = listOf(State(name = "ReplicaCountState", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - // create index defaults to 1 replica - createIndex(indexName, policyID) - - assertEquals("Index did not default to 1 replica", 1, getNumberOfReplicasSetting(indexName)) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to speed up to second execution where it will trigger the first execution of the action which - // should set the replica count to the desired number - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals("Index did not set number_of_replicas to ${actionConfig.numOfReplicas}", actionConfig.numOfReplicas, getNumberOfReplicasSetting(indexName)) } - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index replica_count action validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt deleted file mode 100644 index e6db88428..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverIT.kt +++ /dev/null @@ -1,218 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.common.unit.TimeValue -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestRetryFailedManagedIndexAction -import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings -import org.opensearch.indexmanagement.makeRequest -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionRetry -import org.opensearch.indexmanagement.waitFor -import org.opensearch.rest.RestRequest -import org.opensearch.core.rest.RestStatus -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateRolloverIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - // status: PASSED - fun `test skip rollover`() { - enableValidationService() - val index1 = "index-1" - val alias1 = "x" - val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) - actionConfig.configRetry = ActionRetry(0) - val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(index1, policyID) - changeAlias(index1, alias1, "add", true) - updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1) - - val managedIndexConfig = getExistingManagedIndexConfig(index1) - - // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) } - - updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_SKIP.key, "true") - - val response = client().makeRequest( - RestRequest.Method.POST.toString(), - "${RestRetryFailedManagedIndexAction.RETRY_BASE_URI}/$index1" - ) - assertEquals("Unexpected RestStatus", RestStatus.OK, response.restStatus()) - - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - val data = getExplainValidationResult(index1) - assertEquals( - "Index rollover validation status is pass.", - Validate.ValidationStatus.PASSED, data.validationStatus - ) - assertEquals( - "Index rollover validation message is skipped rollover", - ValidateRollover.getSkipRolloverMessage(index1), data.validationMessage - ) - } - } - - // status: PASSED - fun `test rollover has already been rolled over`() { - enableValidationService() - val aliasName = "${testIndexName}_alias" - val indexNameBase = "${testIndexName}_index" - val index1 = "$indexNameBase-1" - val policyID = "${testIndexName}_testPolicyName_1" - val actionConfig = RolloverAction(null, null, null, null, false, 0) - val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - // create index defaults - createIndex(index1, policyID, aliasName) - - val managedIndexConfig = getExistingManagedIndexConfig(index1) - - // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) } - - // Rollover the alias manually before ISM tries to roll it over - rolloverIndex(aliasName) - - // Need to speed up to second execution where it will trigger the first execution of the action - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - val data = getExplainValidationResult(index1) - assertEquals( - "Index rollover validation status is PASSED.", - Validate.ValidationStatus.PASSED, data.validationStatus - ) - assertEquals( - "Index rollover validation message is already rolled over", - ValidateRollover.getAlreadyRolledOverMessage(index1, aliasName), data.validationMessage - ) - } - assertTrue("New rollover index does not exist.", indexExists("$indexNameBase-000002")) - } - - // status: RE_VALIDATING - fun `test rollover does not have rollover alias index setting`() { - enableValidationService() - val index1 = "index-1" - val index2 = "index-2" - val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) - actionConfig.configRetry = ActionRetry(0) - val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(index1, policyID) - createIndex(index2, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(index1) - - // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) } - - // Need to speed up to second execution where it will trigger the first execution of the action - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - val data = getExplainValidationResult(index1) - assertEquals( - "Index rollover validation status is RE_VALIDATING", - Validate.ValidationStatus.RE_VALIDATING, data.validationStatus - ) - assertEquals( - "Index rollover validation message is no alias index setting", - ValidateRollover.getFailedNoValidAliasMessage(index1), data.validationMessage - ) - } - } - - // status: RE_VALIDATING - fun `test rollover not write index`() { - enableValidationService() - val index1 = "index-1" - val index2 = "index-2" - val alias1 = "x" - val policyID = "${testIndexName}_precheck" - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) - actionConfig.configRetry = ActionRetry(0) - val states = listOf(State(name = "RolloverAction", actions = listOf(actionConfig), transitions = listOf())) - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(index1, policyID) - changeAlias(index1, alias1, "add") - updateIndexSetting(index1, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1) - createIndex(index2, policyID) - changeAlias(index2, alias1, "add", true) - updateIndexSetting(index2, ManagedIndexSettings.ROLLOVER_ALIAS.key, alias1) - - val managedIndexConfig = getExistingManagedIndexConfig(index1) - - // Change the start time so the job will trigger in 2 seconds, this will trigger the first initialization of the policy - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(index1).policyID) } - - // Need to speed up to second execution where it will trigger the first execution of the action - updateManagedIndexConfigStartTime(managedIndexConfig) - waitFor { - val data = getExplainValidationResult(index1) - assertEquals( - "Index rollover validation status is RE_VALIDATING.", - Validate.ValidationStatus.RE_VALIDATING, data.validationStatus - ) - assertEquals( - "Index rollover validation message is not write index", - ValidateRollover.getFailedWriteIndexMessage(index1), data.validationMessage - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt deleted file mode 100644 index 367a38e05..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateRolloverTests.kt +++ /dev/null @@ -1,72 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import com.nhaarman.mockitokotlin2.mock -import com.nhaarman.mockitokotlin2.whenever -import kotlinx.coroutines.runBlocking -import org.opensearch.client.Client -import org.opensearch.cluster.ClusterState -import org.opensearch.cluster.metadata.IndexAbstraction -import org.opensearch.cluster.metadata.IndexMetadata -import org.opensearch.cluster.metadata.Metadata -import org.opensearch.cluster.service.ClusterService -import org.opensearch.common.settings.Settings -import org.opensearch.common.unit.TimeValue -import org.opensearch.indexmanagement.indexstatemanagement.action.RolloverAction -import org.opensearch.indexmanagement.indexstatemanagement.validation.ValidateRollover.Companion.getFailedNoValidAliasMessage -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionMetaData -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData -import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepContext -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.jobscheduler.spi.utils.LockService -import org.opensearch.monitor.jvm.JvmService -import org.opensearch.script.ScriptService -import org.opensearch.test.OpenSearchTestCase -import java.util.* - -class ValidateRolloverTests : OpenSearchTestCase() { - private val scriptService: ScriptService = mock() - private val settings: Settings = Settings.EMPTY - private val clusterService: ClusterService = mock() - private val jvmService: JvmService = mock() - private val indexName: String = "test" - private val metadata = ManagedIndexMetaData( - indexName, "indexUuid", "policy_id", null, null, null, null, null, null, null, - ActionMetaData - ("rollover", 1, 0, false, 0, null, null), - null, null, null - ) - val actionConfig = RolloverAction(null, 3, TimeValue.timeValueDays(2), null, false, 0) - private val client: Client = mock() - private val lockService: LockService = LockService(mock(), clusterService) - private val validate = ValidateRollover(settings, clusterService, jvmService) - private val clusterState: ClusterState = mock() - private val clusterServiceMetadata: Metadata = mock() - private val indexAbstraction: IndexAbstraction = mock() - private val indicesLookup: SortedMap = mock() - private val listOfMetadata: MutableList = mock() - private val indexMetadata: IndexMetadata = mock() - - fun `test rollover when missing rollover alias`() { - val metadata = metadata.copy() - val context = StepContext(metadata, clusterService, client, null, null, scriptService, settings, lockService) - whenever(context.clusterService.state()).thenReturn(clusterState) - whenever(clusterState.metadata()).thenReturn(clusterServiceMetadata) - whenever(clusterServiceMetadata.indicesLookup).thenReturn(indicesLookup) - whenever(indicesLookup[indexName]).thenReturn(indexAbstraction) - whenever(indexAbstraction.indices).thenReturn(listOfMetadata) - whenever(clusterServiceMetadata.index(indexName)).thenReturn(indexMetadata) - whenever(indexMetadata.settings).thenReturn(settings) - - // null pointer exception - runBlocking { - validate.execute(indexName) - } - assertEquals("Validation status is RE_VALIDATING", Validate.ValidationStatus.RE_VALIDATING, validate.validationStatus) - assertEquals("Info message is NO VALID ALIAS", getFailedNoValidAliasMessage(indexName), validate.validationMessage) - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateSnapshotIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateSnapshotIT.kt deleted file mode 100644 index 3f6dfd42c..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateSnapshotIT.kt +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateSnapshotIT : IndexStateManagementRestTestCase() { - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test basic snapshot validation`() { - val indexName = "${testIndexName}_index_basic" - val policyID = "${testIndexName}_policy_basic" - val repository = "repository" - val snapshot = "snapshot" - val actionConfig = SnapshotAction(repository, snapshot, 0) - val states = listOf( - State("Snapshot", listOf(actionConfig), listOf()) - ) - - createRepository(repository) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Change the start time so the job will trigger in 2 seconds. - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Need to wait two cycles for wait for snapshot step - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index snapshot action validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - - waitFor { assertSnapshotExists(repository, "snapshot") } - waitFor { assertSnapshotFinishedWithSuccess(repository, "snapshot") } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateTransitionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateTransitionIT.kt deleted file mode 100644 index a2383b7b6..000000000 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/validation/ValidateTransitionIT.kt +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.indexmanagement.indexstatemanagement.validation - -import org.opensearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase -import org.opensearch.indexmanagement.indexstatemanagement.model.Conditions -import org.opensearch.indexmanagement.indexstatemanagement.model.Policy -import org.opensearch.indexmanagement.indexstatemanagement.model.State -import org.opensearch.indexmanagement.indexstatemanagement.model.Transition -import org.opensearch.indexmanagement.indexstatemanagement.randomErrorNotification -import org.opensearch.indexmanagement.spi.indexstatemanagement.Validate -import org.opensearch.indexmanagement.waitFor -import java.time.Instant -import java.time.temporal.ChronoUnit -import java.util.Locale - -class ValidateTransitionIT : IndexStateManagementRestTestCase() { - - private val testIndexName = javaClass.simpleName.lowercase(Locale.ROOT) - - fun `test transition validation with doc count condition`() { - enableValidationService() - val indexName = "${testIndexName}_index_1" - val policyID = "${testIndexName}_testPolicyName_1" - val secondStateName = "second" - val states = listOf( - State("first", listOf(), listOf(Transition(secondStateName, Conditions(docCount = 5L)))), - State(secondStateName, listOf(), listOf()) - ) - - val policy = Policy( - id = policyID, - description = "$testIndexName description", - schemaVersion = 1L, - lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), - errorNotification = randomErrorNotification(), - defaultState = states[0].name, - states = states - ) - - createPolicy(policy, policyID) - createIndex(indexName, policyID) - - val managedIndexConfig = getExistingManagedIndexConfig(indexName) - - // Initializing the policy/metadata - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } - - // Add 6 documents (>5) - insertSampleData(indexName, 6) - - // Evaluating transition conditions for second time - updateManagedIndexConfigStartTime(managedIndexConfig) - - waitFor { - val data = getExplainValidationResult(indexName) - assertEquals( - "Index transition validation status is PASSED.", - Validate.ValidationStatus.PASSED, - data.validationStatus - ) - } - } -} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt index 3dfb1b579..8ca6a32c3 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/RollupRestTestCase.kt @@ -13,10 +13,8 @@ import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader import org.junit.After import org.junit.AfterClass -import org.junit.Before import org.opensearch.client.Request import org.opensearch.client.Response -import org.opensearch.client.ResponseException import org.opensearch.client.RestClient import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler @@ -38,9 +36,7 @@ import org.opensearch.indexmanagement.util._ID import org.opensearch.indexmanagement.util._PRIMARY_TERM import org.opensearch.indexmanagement.util._SEQ_NO import org.opensearch.indexmanagement.waitFor -import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule import org.opensearch.core.rest.RestStatus -import java.time.Duration import java.time.Instant abstract class RollupRestTestCase : IndexManagementRestTestCase() { @@ -91,24 +87,6 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { } } - @Before - fun setDebugLogLevel() { - client().makeRequest( - "PUT", "_cluster/settings", - StringEntity( - """ - { - "transient": { - "logger.org.opensearch.indexmanagement.rollup":"DEBUG", - "logger.org.opensearch.jobscheduler":"DEBUG" - } - } - """.trimIndent(), - APPLICATION_JSON - ) - ) - } - protected fun createRollup( rollup: Rollup, rollupId: String, @@ -269,41 +247,6 @@ abstract class RollupRestTestCase : IndexManagementRestTestCase() { protected fun Rollup.toHttpEntity(): HttpEntity = StringEntity(toJsonString(), APPLICATION_JSON) - protected fun updateRollupStartTime(update: Rollup, desiredStartTimeMillis: Long? = null) { - // Before updating start time of a job always make sure there are no unassigned shards that could cause the config - // index to move to a new node and negate this forced start - if (isMultiNode) { - waitFor { - try { - client().makeRequest("GET", "_cluster/allocation/explain") - fail("Expected 400 Bad Request when there are no unassigned shards to explain") - } catch (e: ResponseException) { - assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) - } - } - } - val intervalSchedule = (update.jobSchedule as IntervalSchedule) - val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() - val startTimeMillis = desiredStartTimeMillis ?: (Instant.now().toEpochMilli() - millis) - val waitForActiveShards = if (isMultiNode) "all" else "1" - // TODO flaky: Add this log to confirm this update is missed by job scheduler - // This miss is because shard remove, job scheduler deschedule on the original node and reschedule on another node - // However the shard comes back, and job scheduler deschedule on the another node and reschedule on the original node - // During this period, this update got missed - // Since from the log, this happens very fast (within 0.1~0.2s), the above cluster explain may not have the granularity to catch this. - logger.info("Update rollup start time to $startTimeMillis") - val response = client().makeRequest( - "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.id}?wait_for_active_shards=$waitForActiveShards&refresh=true", - StringEntity( - "{\"doc\":{\"rollup\":{\"schedule\":{\"interval\":{\"start_time\":" + - "\"$startTimeMillis\"}}}}}", - APPLICATION_JSON - ) - ) - - assertEquals("Request failed", RestStatus.OK, response.restStatus()) - } - protected fun updateSearchAllJobsClusterSetting(value: Boolean) { val formattedValue = "\"${value}\"" val request = """ diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index 9174340ed..e34dd687e 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -271,8 +271,6 @@ class RollupRunnerIT : RollupRestTestCase() { // Setting the interval to something large to minimize this scenario. fun `test no-op execution when a full window of time to rollup is not available`() { val indexName = "test_index_runner_third" - - // Define rollup var rollup = randomRollup().copy( id = "$testName-2", enabled = true, @@ -290,7 +288,6 @@ class RollupRunnerIT : RollupRestTestCase() { // Create source index createRollupSourceIndex(rollup) - // Add a document using the rollup's DateHistogram source field to ensure a metadata document is created putDateDocumentInSourceIndex(rollup) @@ -313,7 +310,6 @@ class RollupRunnerIT : RollupRestTestCase() { assertNotNull("Rollup metadata not found", previousRollupMetadata) assertEquals("Unexpected metadata status", RollupMetadata.Status.INIT, previousRollupMetadata!!.status) } - assertNotNull("Previous rollup metadata was not saved", previousRollupMetadata) // Update rollup start time to run second execution diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt index 95a4a58d4..85a9d98bb 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SMRunnerIT.kt @@ -37,11 +37,9 @@ class SMRunnerIT : SnapshotManagementRestTestCase() { assertNotNull(explainMetadata.creation!!.trigger.time) } - // Wait for cron schedule to meet - Thread.sleep(60_000L) - // Create condition met updateSMPolicyStartTime(smPolicy) + updateSMMetadata(getSMPolicy(smPolicy.policyName)) waitFor { val explainMetadata = parseExplainResponse(explainSMPolicy(policyName).entity.content).first() assertNotNull(explainMetadata.creation!!.started) diff --git a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt index 2d1712f70..aa7df873c 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/snapshotmanagement/SnapshotManagementRestTestCase.kt @@ -149,6 +149,51 @@ abstract class SnapshotManagementRestTestCase : IndexManagementRestTestCase() { assertEquals("Request failed", RestStatus.OK, response.restStatus()) } + /** + * This method updates the trigger time so snapshot creation can happen quickly + */ + protected fun updateSMMetadata(update: SMPolicy, desiredStartTimeMillis: Long? = null) { + // Before updating start time of a job always make sure there are no unassigned shards that could cause the config + // index to move to a new node and negate this forced start + if (isMultiNode) { + waitFor { + try { + client().makeRequest("GET", "_cluster/allocation/explain") + fail("Expected 400 Bad Request when there are no unassigned shards to explain") + } catch (e: ResponseException) { + assertEquals(RestStatus.BAD_REQUEST, e.response.restStatus()) + } + } + } + val intervalSchedule = (update.jobSchedule as IntervalSchedule) + val millis = Duration.of(intervalSchedule.interval.toLong(), intervalSchedule.unit).minusSeconds(2).toMillis() + val startTimeMillis = desiredStartTimeMillis ?: (now().toEpochMilli() - millis) + val waitForActiveShards = if (isMultiNode) "all" else "1" + val response = client().makeRequest( + "POST", "$INDEX_MANAGEMENT_INDEX/_update/${update.metadataID}?wait_for_active_shards=$waitForActiveShards", + StringEntity( + """ + { + "doc": { + "sm_metadata": { + "policy_seq_no": ${update.seqNo}, + "policy_primary_term": ${update.primaryTerm}, + "creation": { + "trigger": { + "time": $startTimeMillis + } + } + } + } + } + """.trimIndent(), + APPLICATION_JSON + ) + ) + + assertEquals("Request failed", RestStatus.OK, response.restStatus()) + } + fun parseExplainResponse(inputStream: InputStream): List { val parser = createParser(XContentType.JSON.xContent(), inputStream) // val parser = createParser(builder) diff --git a/worksheets/sm/create.http b/worksheets/sm/create.http index 9b0d4b648..2d30ba65c 100644 --- a/worksheets/sm/create.http +++ b/worksheets/sm/create.http @@ -14,7 +14,7 @@ Content-Type: application/json DELETE localhost:9200/.opendistro-ism-config ### index sm -PUT localhost:9200/_plugins/_sm/daily_snapshot +POST localhost:9200/_plugins/_sm/policies/daily_snapshot Content-Type: application/json { @@ -60,6 +60,22 @@ Content-Type: application/json # } # }, +### +POST localhost:9200/.opendistro-ism-config/_update/daily_snapshot-sm-metadata +Content-Type: application/json + +{ + "doc": { + "sm_metadata": { + "creation": { + "trigger": { + "time": 1234567 + } + } + } + } +} + ### register repo PUT localhost:9200/_snapshot/repo Content-Type: application/json