Skip to content

Commit

Permalink
changes to support generic inputs and triggers in remote monitors
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <[email protected]>
  • Loading branch information
sbcd90 committed May 30, 2024
1 parent 315d8ac commit 629a0c3
Show file tree
Hide file tree
Showing 9 changed files with 287 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger
import org.opensearch.core.action.ActionResponse
import org.opensearch.core.common.io.stream.NamedWriteableRegistry
import org.opensearch.core.common.io.stream.StreamInput
Expand Down Expand Up @@ -240,6 +241,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY,
ChainedAlertTrigger.XCONTENT_REGISTRY,
RemoteMonitorTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,35 @@
package org.opensearch.alerting;

import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.runners.SampleRemoteDocLevelMonitorRunner;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1;
import org.opensearch.client.node.NodeClient;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.commons.alerting.action.AlertingActions;
import org.opensearch.commons.alerting.action.IndexMonitorRequest;
import org.opensearch.commons.alerting.action.IndexMonitorResponse;
import org.opensearch.commons.alerting.model.DataSources;
import org.opensearch.commons.alerting.model.DocLevelMonitorInput;
import org.opensearch.commons.alerting.model.DocLevelQuery;
import org.opensearch.commons.alerting.model.IntervalSchedule;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.action.Action;
import org.opensearch.commons.alerting.model.action.Throttle;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.index.seqno.SequenceNumbers;
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.rest.RestResponse;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -52,6 +63,17 @@ public List<Route> routes() {
@Override
protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException {
String runMonitorParam = restRequest.param("run_monitor");

SampleRemoteMonitorInput1 input1 = new SampleRemoteMonitorInput1("hello", Map.of("test", 1.0f), 1);
BytesStreamOutput out = new BytesStreamOutput();
input1.writeTo(out);
BytesReference input1Serialized = out.bytes();

SampleRemoteMonitorTrigger1 trigger1 = new SampleRemoteMonitorTrigger1("hello", Map.of("test", 1.0f), 1);
BytesStreamOutput outTrigger = new BytesStreamOutput();
trigger1.writeTo(outTrigger);
BytesReference trigger1Serialized = outTrigger.bytes();

Monitor monitor1 = new Monitor(
Monitor.NO_ID,
Monitor.NO_VERSION,
Expand All @@ -63,8 +85,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient
SampleRemoteMonitorPlugin.SAMPLE_REMOTE_MONITOR1,
null,
0,
List.of(),
List.of(),
List.of(new RemoteMonitorInput(input1Serialized)),
List.of(new RemoteMonitorTrigger("id", "name", "1",
List.of(new Action("name", "destinationId", new Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "Hello World", Map.of()),
new Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "Hello World", Map.of()), false, new Throttle(60, ChronoUnit.MINUTES),
"id", null)), trigger1Serialized)),
Map.of(),
new DataSources(),
"sample-remote-monitor-plugin"
Expand Down Expand Up @@ -106,6 +131,12 @@ public void onFailure(Exception e) {
);
};
} else if (runMonitorParam.equals("multiple")) {
SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello",
new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of()))));
BytesStreamOutput out1 = new BytesStreamOutput();
input2.writeTo(out1);
BytesReference input1Serialized1 = out1.bytes();

Monitor monitor2 = new Monitor(
Monitor.NO_ID,
Monitor.NO_VERSION,
Expand All @@ -117,7 +148,7 @@ public void onFailure(Exception e) {
SampleRemoteMonitorPlugin.SAMPLE_REMOTE_MONITOR2,
null,
0,
List.of(),
List.of(new RemoteMonitorInput(input1Serialized1)),
List.of(),
Map.of(),
new DataSources(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.inputs;

import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;
import java.util.Map;

public class SampleRemoteMonitorInput1 implements Writeable {

private String a;

private Map<String, Object> b;

private int c;

public SampleRemoteMonitorInput1(String a, Map<String, Object> b, int c) {
this.a = a;
this.b = b;
this.c = c;
}

public SampleRemoteMonitorInput1(StreamInput sin) throws IOException {
this(
sin.readString(),
sin.readMap(),
sin.readInt()
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(a);
out.writeMap(b);
out.writeInt(c);
}

public int getC() {
return c;
}

public Map<String, Object> getB() {
return b;
}

public String getA() {
return a;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.inputs;

import org.opensearch.commons.alerting.model.DocLevelMonitorInput;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.common.io.stream.Writeable;

import java.io.IOException;

public class SampleRemoteMonitorInput2 implements Writeable {

private String a;

private DocLevelMonitorInput b;

public SampleRemoteMonitorInput2(String a, DocLevelMonitorInput b) {
this.a = a;
this.b = b;
}

public SampleRemoteMonitorInput2(StreamInput sin) throws IOException {
this(
sin.readString(),
DocLevelMonitorInput.readFrom(sin)
);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeString(a);
b.writeTo(out);
}

public String getA() {
return a;
}

public DocLevelMonitorInput getB() {
return b;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,21 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1;
import org.opensearch.alerting.monitor.trigger.results.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1;
import org.opensearch.alerting.spi.RemoteMonitorRunner;
import org.opensearch.client.Client;
import org.opensearch.commons.alerting.model.Input;
import org.opensearch.commons.alerting.model.InputRunResults;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.MonitorRunResult;
import org.opensearch.commons.alerting.model.Trigger;
import org.opensearch.commons.alerting.model.TriggerRunResult;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.transport.TransportService;

import java.time.Instant;
Expand Down Expand Up @@ -57,17 +65,45 @@ public MonitorRunResult<TriggerRunResult> runMonitor(
String executionId,
TransportService transportService
) {
IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER1_INDEX)
.source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);
try {
BytesReference customInputSerialized = null;
Input input = monitor.getInputs().get(0);
if (input instanceof RemoteMonitorInput) {
customInputSerialized = ((RemoteMonitorInput) input).getInput();
}
StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes);
SampleRemoteMonitorInput1 remoteMonitorInput = new SampleRemoteMonitorInput1(sin);

BytesReference customTriggerSerialized = null;
Trigger trigger = monitor.getTriggers().get(0);
if (trigger instanceof RemoteMonitorTrigger) {
customTriggerSerialized = ((RemoteMonitorTrigger) trigger).getTrigger();
}
StreamInput triggerSin = StreamInput.wrap(customTriggerSerialized.toBytesRef().bytes);
SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin);

IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER1_INDEX)
.source(Map.of(remoteMonitorInput.getA(), remoteMonitorTrigger.getC())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);

return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
} catch (Exception ex) {
return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
ex,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", ex, Map.of()))
);
}

return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,18 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2;
import org.opensearch.alerting.monitor.trigger.results.SampleRemoteMonitorTriggerRunResult;
import org.opensearch.alerting.spi.RemoteMonitorRunner;
import org.opensearch.client.Client;
import org.opensearch.commons.alerting.model.Input;
import org.opensearch.commons.alerting.model.InputRunResults;
import org.opensearch.commons.alerting.model.Monitor;
import org.opensearch.commons.alerting.model.MonitorRunResult;
import org.opensearch.commons.alerting.model.TriggerRunResult;
import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.transport.TransportService;

import java.time.Instant;
Expand Down Expand Up @@ -57,17 +62,37 @@ public MonitorRunResult<TriggerRunResult> runMonitor(
String executionId,
TransportService transportService
) {
IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER2_INDEX)
.source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);
try {
BytesReference customInputSerialized = null;
Input input = monitor.getInputs().get(0);
if (input instanceof RemoteMonitorInput) {
customInputSerialized = ((RemoteMonitorInput) input).getInput();
}
StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes);
SampleRemoteMonitorInput2 remoteMonitorInput = new SampleRemoteMonitorInput2(sin);

IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER2_INDEX)
.source(Map.of(remoteMonitorInput.getB().name(), remoteMonitorInput.getB().getQueries().get(0).getQuery()))
.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
this.client.index(indexRequest);

return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
null,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of()))
);
} catch (Exception ex) {
return new MonitorRunResult<>(
monitor.getName(),
periodStart,
periodEnd,
ex,
new InputRunResults(),
Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", ex, Map.of()))
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.monitor.triggers;
package org.opensearch.alerting.monitor.trigger.results;

import org.opensearch.commons.alerting.model.ActionRunResult;
import org.opensearch.commons.alerting.model.TriggerRunResult;
Expand Down
Loading

0 comments on commit 629a0c3

Please sign in to comment.