Skip to content

Commit

Permalink
misc changes and basic ITs
Browse files Browse the repository at this point in the history
Signed-off-by: Dennis Toepker <[email protected]>
  • Loading branch information
toepkerd-zz committed May 29, 2024
1 parent 85ba751 commit 1569cca
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.CROSS_CLUSTER_MONITORING_ENABLED,
AlertingSettings.ALERTING_NOTES_ENABLED,
AlertingSettings.NOTES_HISTORY_ENABLED,
AlertingSettings.NOTES_HISTORY_MAX_DOCS,
AlertingSettings.NOTES_HISTORY_INDEX_MAX_AGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.BucketLevelTrigger
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Note
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
Expand All @@ -59,7 +60,6 @@ import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID
import org.opensearch.commons.alerting.model.Note

object BucketLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ class AlertIndices(
)
}

private suspend fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
private fun getIndicesToDelete(clusterStateResponse: ClusterStateResponse): List<String> {
val indicesToDelete = mutableListOf<String>()
for (entry in clusterStateResponse.state.metadata.indices) {
val indexMetaData = entry.value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,6 @@ class NotesIndices(
}

companion object {
// const val NOTES_INDEX = ".opensearch-alerting-notes"

/** The alias of the index in which to write notes finding */
const val NOTES_HISTORY_WRITE_INDEX = ".opensearch-alerting-notes-history-write"

Expand Down Expand Up @@ -109,14 +107,6 @@ class NotesIndices(
*
* @param actionListener A callback listener for the index creation call. Generally in the form of onSuccess, onFailure
*/
// fun initNotesIndex(actionListener: ActionListener<CreateIndexResponse>) {
// if (!notesIndexExists()) {
// var indexRequest = CreateIndexRequest(NOTES_INDEX)
// .mapping(notesMapping())
// .settings(Settings.builder().put("index.hidden", true).build())
// client.indices().create(indexRequest, actionListener)
// }
// }

fun onMaster() {
try {
Expand Down Expand Up @@ -175,24 +165,6 @@ class NotesIndices(
return notesHistoryEnabled
}

// suspend fun createOrUpdateInitialNotesHistoryIndex(dataSources: DataSources) {
// if (dataSources.notesIndex == NotesIndices.NOTES_INDEX) {
// return createOrUpdateInitialNotesHistoryIndex()
// }
// if (!clusterService.state().metadata.hasAlias(dataSources.notesHistoryIndex)) {
// createIndex(
// dataSources.notesHistoryIndexPattern ?: NOTES_HISTORY_INDEX_PATTERN,
// notesMapping(),
// dataSources.notesHistoryIndex
// )
// } else {
// updateIndexMapping(
// dataSources.notesHistoryIndex ?: NOTES_HISTORY_WRITE_INDEX,
// notesMapping(),
// true
// )
// }
// }
suspend fun createOrUpdateInitialNotesHistoryIndex() {
if (!isNotesHistoryInitialized()) {
notesHistoryIndexInitialized = createIndex(NOTES_HISTORY_INDEX_PATTERN, notesMapping(), NOTES_HISTORY_WRITE_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,10 @@ private fun indexNoteResponse(channel: RestChannel, restMethod: RestRequest.Meth
returnStatus = RestStatus.OK

val restResponse = BytesRestResponse(returnStatus, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS))
if (returnStatus == RestStatus.CREATED) {
val location = "${AlertingPlugin.MONITOR_BASE_URI}/alerts/notes/${response.id}"
restResponse.addHeader("Location", location)
}
// if (returnStatus == RestStatus.CREATED) {
// val location = "${AlertingPlugin.MONITOR_BASE_URI}/alerts/notes/${response.id}"
// restResponse.addHeader("Location", location)
// }
return restResponse
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,6 @@ class RestSearchMonitorAction(
channel.request().xContentRegistry,
LoggingDeprecationHandler.INSTANCE, hit.sourceAsString
).use { hitsParser ->
log.info("monitor hit sourceAsString: ${hit.sourceAsString}")
log.info("monitor parser curr token: ${hitsParser.currentToken()}")
hitsParser.nextToken()
log.info("monitor parser next token: ${hitsParser.currentToken()}")
val monitor = ScheduledJob.parse(hitsParser, hit.id, hit.version)
val xcb = monitor.toXContent(jsonBuilder(), EMPTY_PARAMS)
hit.sourceRef(BytesReference.bytes(xcb))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,39 +224,45 @@ class AlertingSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val ALERTING_NOTES_ENABLED = Setting.boolSetting(
"plugins.alerting.notes_enabled",
false,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val NOTES_HISTORY_ENABLED = Setting.boolSetting(
"plugins.notes_history_enabled",
"plugins.alerting.notes_history_enabled",
true,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val NOTES_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.notes_history_max_docs",
"plugins.alerting.notes_history_max_docs",
1000L,
0L,
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val NOTES_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.notes_history_max_age",
"plugins.alerting.notes_history_max_age",
TimeValue(30, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val NOTES_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting(
"plugins.notes_history_rollover_period",
"plugins.alerting.notes_history_rollover_period",
TimeValue(12, TimeUnit.HOURS),
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val NOTES_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.notes_history_retention_period",
"plugins.alerting.notes_history_retention_period",
TimeValue(60, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
)

val NOTES_MAX_CONTENT_SIZE = Setting.longSetting(
"plugins.notes.max_content_size",
"plugins.alerting.notes.max_content_size",
2000L,
0L,
Setting.Property.NodeScope, Setting.Property.Dynamic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,29 @@ class TransportDeleteNoteAction @Inject constructor(
),
SecureTransportAction {

@Volatile private var alertingNotesEnabled = AlertingSettings.ALERTING_NOTES_ENABLED.get(settings)
@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.ALERTING_NOTES_ENABLED) { alertingNotesEnabled = it }
listenFilterBySettingChange(clusterService)
}

override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener<DeleteNoteResponse>) {
// validate feature flag enabled
if (!alertingNotesEnabled) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Notes for Alerting is currently disabled", RestStatus.FORBIDDEN),
)
)
return
}

val transformedRequest = request as? DeleteNoteRequest
?: recreateObject(request) { DeleteNoteRequest(it) }

val user = readUserFromThreadContext(client)
// val deleteRequest = DeleteRequest(ALL_NOTES_INDEX_PATTERN, transformedRequest.noteId)

if (!validateUserBackendRoles(user, actionListener)) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.notes.NotesIndices
import org.opensearch.alerting.notes.NotesIndices.Companion.NOTES_HISTORY_WRITE_INDEX
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERTING_NOTES_ENABLED
import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT
import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_NOTES_PER_ALERT
import org.opensearch.alerting.settings.AlertingSettings.Companion.NOTES_MAX_CONTENT_SIZE
Expand Down Expand Up @@ -80,13 +81,15 @@ constructor(
),
SecureTransportAction {

@Volatile private var alertingNotesEnabled = ALERTING_NOTES_ENABLED.get(settings)
@Volatile private var notesMaxContentSize = NOTES_MAX_CONTENT_SIZE.get(settings)
@Volatile private var maxNotesPerAlert = MAX_NOTES_PER_ALERT.get(settings)
@Volatile private var indexTimeout = INDEX_TIMEOUT.get(settings)

@Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ALERTING_NOTES_ENABLED) { alertingNotesEnabled = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(NOTES_MAX_CONTENT_SIZE) { notesMaxContentSize = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(MAX_NOTES_PER_ALERT) { maxNotesPerAlert = it }
clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_TIMEOUT) { indexTimeout = it }
Expand All @@ -98,6 +101,16 @@ constructor(
request: ActionRequest,
actionListener: ActionListener<IndexNoteResponse>,
) {
// validate feature flag enabled
if (!alertingNotesEnabled) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Notes for Alerting is currently disabled", RestStatus.FORBIDDEN),
)
)
return
}

val transformedRequest =
request as? IndexNoteRequest
?: recreateObject(request, namedWriteableRegistry) {
Expand Down Expand Up @@ -143,7 +156,7 @@ constructor(
// Also need to check if user has permissions to add a Note to the passed in Alert. To do this,
// we retrieve the Alert to get its associated monitor user, and use that to
// check if they have permissions to the Monitor that generated the Alert
val queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("_id", listOf(request.alertId)))
val queryBuilder = QueryBuilders.boolQuery().must(QueryBuilders.termsQuery("_id", listOf(request.entityId)))
val searchSourceBuilder =
SearchSourceBuilder()
.version(true)
Expand Down Expand Up @@ -173,7 +186,7 @@ constructor(
if (alerts.isEmpty()) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Alert with ID ${request.alertId} is not found", RestStatus.NOT_FOUND),
OpenSearchStatusException("Alert with ID ${request.entityId} is not found", RestStatus.NOT_FOUND),
)
)
return
Expand All @@ -197,7 +210,7 @@ constructor(
log.info("checking user permissions in index note")
checkUserPermissionsWithResource(user, alert.monitorUser, actionListener, "monitor", alert.monitorId)

val note = Note(alertId = request.alertId, content = request.content, createdTime = Instant.now(), user = user)
val note = Note(entityId = request.entityId, content = request.content, createdTime = Instant.now(), user = user)

val indexRequest =
IndexRequest(NOTES_HISTORY_WRITE_INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.ActionRequest
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand All @@ -34,6 +35,7 @@ import org.opensearch.commons.authuser.User
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
Expand All @@ -59,13 +61,24 @@ class TransportSearchNoteAction @Inject constructor(
),
SecureTransportAction {

@Volatile
override var filterByEnabled: Boolean = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
@Volatile private var alertingNotesEnabled = AlertingSettings.ALERTING_NOTES_ENABLED.get(settings)
@Volatile override var filterByEnabled: Boolean = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings)
init {
clusterService.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.ALERTING_NOTES_ENABLED) { alertingNotesEnabled = it }
listenFilterBySettingChange(clusterService)
}

override fun doExecute(task: Task, request: ActionRequest, actionListener: ActionListener<SearchResponse>) {
// validate feature flag enabled
if (!alertingNotesEnabled) {
actionListener.onFailure(
AlertingException.wrap(
OpenSearchStatusException("Notes for Alerting is currently disabled", RestStatus.FORBIDDEN),
)
)
return
}

val transformedRequest = request as? SearchNoteRequest
?: recreateObject(request, namedWriteableRegistry) {
SearchNoteRequest(it)
Expand Down Expand Up @@ -108,7 +121,7 @@ class TransportSearchNoteAction @Inject constructor(
val queryBuilder = searchNoteRequest.searchRequest.source().query() as BoolQueryBuilder
searchNoteRequest.searchRequest.source().query(
queryBuilder.filter(
QueryBuilders.termsQuery(Note.ALERT_ID_FIELD, alertIDs)
QueryBuilders.termsQuery(Note.ENTITY_ID_FIELD, alertIDs)
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,13 @@ import org.opensearch.commons.alerting.model.DocumentLevelTrigger
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.FindingWithDocs
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.Note
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.util.string
import org.opensearch.commons.authuser.User
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.ToXContent
Expand Down Expand Up @@ -521,6 +523,37 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return alert.copy(id = alertJson["_id"] as String, version = (alertJson["_version"] as Int).toLong())
}

protected fun createAlertNote(alertId: String, content: String): Note {
val createRequestBody = jsonBuilder()
.startObject()
.field(Note.NOTE_CONTENT_FIELD, content)
.endObject()
.string()

val createResponse = client().makeRequest(
"POST",
"$ALERTING_BASE_URI/alerts/$alertId/notes",
StringEntity(createRequestBody, APPLICATION_JSON)
)

assertEquals("Unable to create a new alert", RestStatus.CREATED, createResponse.restStatus())

val responseBody = createResponse.asMap()
val noteId = responseBody["_id"] as String
assertNotEquals("response is missing Id", Note.NO_ID, noteId)

val note = responseBody["note"] as Map<*, *>

return Note(
id = noteId,
entityId = note["entity_id"] as String,
content = note["content"] as String,
createdTime = Instant.ofEpochMilli(note["created_time"] as Long),
lastUpdatedTime = if (note["last_updated_time"] != null) Instant.ofEpochMilli(note["last_updated_time"] as Long) else null,
user = note["user"]?.let { User(it as String, emptyList(), emptyList(), emptyList()) }
)
}

protected fun createRandomMonitor(refresh: Boolean = false, withMetadata: Boolean = false): Monitor {
val monitor = randomQueryLevelMonitor(withMetadata = withMetadata)
val monitorId = createMonitor(monitor, refresh).id
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ val WORKFLOW_ALERTING_BASE_URI = "/_plugins/_alerting/workflows"
val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"
val LEGACY_OPENDISTRO_ALERTING_BASE_URI = "/_opendistro/_alerting/monitors"
val LEGACY_OPENDISTRO_DESTINATION_BASE_URI = "/_opendistro/_alerting/destinations"
val ALERTING_NOTES_BASE_URI = ""
val ALWAYS_RUN = Script("return true")
val NEVER_RUN = Script("return false")
val DRYRUN_MONITOR = mapOf("dryrun" to "true")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
package org.opensearch.alerting.alerts

class NotesIndicesIT {
import org.opensearch.alerting.AlertingRestTestCase

class NotesIndicesIT : AlertingRestTestCase() {
fun `test create initial notes index`() {
}
}
Loading

0 comments on commit 1569cca

Please sign in to comment.