From 629a0c30a1a867129df99b1fb4821cdc4fdcfd0d Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Thu, 30 May 2024 02:36:43 +0000 Subject: [PATCH] changes to support generic inputs and triggers in remote monitors Signed-off-by: Subhobrata Dey --- .../org/opensearch/alerting/AlertingPlugin.kt | 2 + .../SampleRemoteMonitorRestHandler.java | 39 ++++++++++-- .../inputs/SampleRemoteMonitorInput1.java | 55 +++++++++++++++++ .../inputs/SampleRemoteMonitorInput2.java | 46 ++++++++++++++ .../runners/SampleRemoteMonitorRunner1.java | 60 +++++++++++++++---- .../runners/SampleRemoteMonitorRunner2.java | 49 +++++++++++---- .../SampleRemoteMonitorTriggerRunResult.java | 2 +- .../triggers/SampleRemoteMonitorTrigger1.java | 55 +++++++++++++++++ .../alerting/SampleRemoteMonitorIT.java | 11 +++- 9 files changed, 287 insertions(+), 32 deletions(-) create mode 100644 sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput1.java create mode 100644 sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput2.java rename sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/{triggers => trigger/results}/SampleRemoteMonitorTriggerRunResult.java (98%) create mode 100644 sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTrigger1.java diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index aacac2610..2812a190c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -98,6 +98,7 @@ import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger import org.opensearch.core.action.ActionResponse import org.opensearch.core.common.io.stream.NamedWriteableRegistry import org.opensearch.core.common.io.stream.StreamInput @@ -240,6 +241,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ClusterMetricsInput.XCONTENT_REGISTRY, DocumentLevelTrigger.XCONTENT_REGISTRY, ChainedAlertTrigger.XCONTENT_REGISTRY, + RemoteMonitorTrigger.XCONTENT_REGISTRY, Workflow.XCONTENT_REGISTRY ) } diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java index 341887f40..65b433d6b 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -6,17 +6,26 @@ package org.opensearch.alerting; import org.opensearch.action.support.WriteRequest; -import org.opensearch.alerting.monitor.runners.SampleRemoteDocLevelMonitorRunner; +import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1; +import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2; +import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1; import org.opensearch.client.node.NodeClient; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.xcontent.json.JsonXContent; import org.opensearch.commons.alerting.action.AlertingActions; import org.opensearch.commons.alerting.action.IndexMonitorRequest; import org.opensearch.commons.alerting.action.IndexMonitorResponse; import org.opensearch.commons.alerting.model.DataSources; import org.opensearch.commons.alerting.model.DocLevelMonitorInput; +import org.opensearch.commons.alerting.model.DocLevelQuery; import org.opensearch.commons.alerting.model.IntervalSchedule; import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.commons.alerting.model.action.Action; +import org.opensearch.commons.alerting.model.action.Throttle; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.core.rest.RestStatus; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.index.seqno.SequenceNumbers; @@ -24,6 +33,8 @@ import org.opensearch.rest.BytesRestResponse; import org.opensearch.rest.RestRequest; import org.opensearch.rest.RestResponse; +import org.opensearch.script.Script; +import org.opensearch.script.ScriptType; import java.io.IOException; import java.time.Instant; @@ -52,6 +63,17 @@ public List routes() { @Override protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { String runMonitorParam = restRequest.param("run_monitor"); + + SampleRemoteMonitorInput1 input1 = new SampleRemoteMonitorInput1("hello", Map.of("test", 1.0f), 1); + BytesStreamOutput out = new BytesStreamOutput(); + input1.writeTo(out); + BytesReference input1Serialized = out.bytes(); + + SampleRemoteMonitorTrigger1 trigger1 = new SampleRemoteMonitorTrigger1("hello", Map.of("test", 1.0f), 1); + BytesStreamOutput outTrigger = new BytesStreamOutput(); + trigger1.writeTo(outTrigger); + BytesReference trigger1Serialized = outTrigger.bytes(); + Monitor monitor1 = new Monitor( Monitor.NO_ID, Monitor.NO_VERSION, @@ -63,8 +85,11 @@ protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient SampleRemoteMonitorPlugin.SAMPLE_REMOTE_MONITOR1, null, 0, - List.of(), - List.of(), + List.of(new RemoteMonitorInput(input1Serialized)), + List.of(new RemoteMonitorTrigger("id", "name", "1", + List.of(new Action("name", "destinationId", new Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "Hello World", Map.of()), + new Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, "Hello World", Map.of()), false, new Throttle(60, ChronoUnit.MINUTES), + "id", null)), trigger1Serialized)), Map.of(), new DataSources(), "sample-remote-monitor-plugin" @@ -106,6 +131,12 @@ public void onFailure(Exception e) { ); }; } else if (runMonitorParam.equals("multiple")) { + SampleRemoteMonitorInput2 input2 = new SampleRemoteMonitorInput2("hello", + new DocLevelMonitorInput("test", List.of("test"), List.of(new DocLevelQuery("query", "query", List.of(), "test:1", List.of())))); + BytesStreamOutput out1 = new BytesStreamOutput(); + input2.writeTo(out1); + BytesReference input1Serialized1 = out1.bytes(); + Monitor monitor2 = new Monitor( Monitor.NO_ID, Monitor.NO_VERSION, @@ -117,7 +148,7 @@ public void onFailure(Exception e) { SampleRemoteMonitorPlugin.SAMPLE_REMOTE_MONITOR2, null, 0, - List.of(), + List.of(new RemoteMonitorInput(input1Serialized1)), List.of(), Map.of(), new DataSources(), diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput1.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput1.java new file mode 100644 index 000000000..fe27fdfe7 --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput1.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.monitor.inputs; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; + +public class SampleRemoteMonitorInput1 implements Writeable { + + private String a; + + private Map b; + + private int c; + + public SampleRemoteMonitorInput1(String a, Map b, int c) { + this.a = a; + this.b = b; + this.c = c; + } + + public SampleRemoteMonitorInput1(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readMap(), + sin.readInt() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(a); + out.writeMap(b); + out.writeInt(c); + } + + public int getC() { + return c; + } + + public Map getB() { + return b; + } + + public String getA() { + return a; + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput2.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput2.java new file mode 100644 index 000000000..5f35f9a5b --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/inputs/SampleRemoteMonitorInput2.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.monitor.inputs; + +import org.opensearch.commons.alerting.model.DocLevelMonitorInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +public class SampleRemoteMonitorInput2 implements Writeable { + + private String a; + + private DocLevelMonitorInput b; + + public SampleRemoteMonitorInput2(String a, DocLevelMonitorInput b) { + this.a = a; + this.b = b; + } + + public SampleRemoteMonitorInput2(StreamInput sin) throws IOException { + this( + sin.readString(), + DocLevelMonitorInput.readFrom(sin) + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(a); + b.writeTo(out); + } + + public String getA() { + return a; + } + + public DocLevelMonitorInput getB() { + return b; + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner1.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner1.java index cdee6fdd6..f2e67f460 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner1.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner1.java @@ -9,13 +9,21 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; -import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTriggerRunResult; +import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput1; +import org.opensearch.alerting.monitor.trigger.results.SampleRemoteMonitorTriggerRunResult; +import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTrigger1; import org.opensearch.alerting.spi.RemoteMonitorRunner; import org.opensearch.client.Client; +import org.opensearch.commons.alerting.model.Input; import org.opensearch.commons.alerting.model.InputRunResults; import org.opensearch.commons.alerting.model.Monitor; import org.opensearch.commons.alerting.model.MonitorRunResult; +import org.opensearch.commons.alerting.model.Trigger; import org.opensearch.commons.alerting.model.TriggerRunResult; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorTrigger; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.transport.TransportService; import java.time.Instant; @@ -57,17 +65,45 @@ public MonitorRunResult runMonitor( String executionId, TransportService transportService ) { - IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER1_INDEX) - .source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - this.client.index(indexRequest); + try { + BytesReference customInputSerialized = null; + Input input = monitor.getInputs().get(0); + if (input instanceof RemoteMonitorInput) { + customInputSerialized = ((RemoteMonitorInput) input).getInput(); + } + StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes); + SampleRemoteMonitorInput1 remoteMonitorInput = new SampleRemoteMonitorInput1(sin); + + BytesReference customTriggerSerialized = null; + Trigger trigger = monitor.getTriggers().get(0); + if (trigger instanceof RemoteMonitorTrigger) { + customTriggerSerialized = ((RemoteMonitorTrigger) trigger).getTrigger(); + } + StreamInput triggerSin = StreamInput.wrap(customTriggerSerialized.toBytesRef().bytes); + SampleRemoteMonitorTrigger1 remoteMonitorTrigger = new SampleRemoteMonitorTrigger1(triggerSin); + + IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER1_INDEX) + .source(Map.of(remoteMonitorInput.getA(), remoteMonitorTrigger.getC())).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + this.client.index(indexRequest); + + return new MonitorRunResult<>( + monitor.getName(), + periodStart, + periodEnd, + null, + new InputRunResults(), + Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of())) + ); + } catch (Exception ex) { + return new MonitorRunResult<>( + monitor.getName(), + periodStart, + periodEnd, + ex, + new InputRunResults(), + Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", ex, Map.of())) + ); + } - return new MonitorRunResult<>( - monitor.getName(), - periodStart, - periodEnd, - null, - new InputRunResults(), - Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of())) - ); } } \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner2.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner2.java index d4a5d4ce7..31f732b51 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner2.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/runners/SampleRemoteMonitorRunner2.java @@ -9,13 +9,18 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.support.WriteRequest; -import org.opensearch.alerting.monitor.triggers.SampleRemoteMonitorTriggerRunResult; +import org.opensearch.alerting.monitor.inputs.SampleRemoteMonitorInput2; +import org.opensearch.alerting.monitor.trigger.results.SampleRemoteMonitorTriggerRunResult; import org.opensearch.alerting.spi.RemoteMonitorRunner; import org.opensearch.client.Client; +import org.opensearch.commons.alerting.model.Input; import org.opensearch.commons.alerting.model.InputRunResults; import org.opensearch.commons.alerting.model.Monitor; import org.opensearch.commons.alerting.model.MonitorRunResult; import org.opensearch.commons.alerting.model.TriggerRunResult; +import org.opensearch.commons.alerting.model.remote.monitors.RemoteMonitorInput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.transport.TransportService; import java.time.Instant; @@ -57,17 +62,37 @@ public MonitorRunResult runMonitor( String executionId, TransportService transportService ) { - IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER2_INDEX) - .source(Map.of("sample", "record")).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); - this.client.index(indexRequest); + try { + BytesReference customInputSerialized = null; + Input input = monitor.getInputs().get(0); + if (input instanceof RemoteMonitorInput) { + customInputSerialized = ((RemoteMonitorInput) input).getInput(); + } + StreamInput sin = StreamInput.wrap(customInputSerialized.toBytesRef().bytes); + SampleRemoteMonitorInput2 remoteMonitorInput = new SampleRemoteMonitorInput2(sin); + + IndexRequest indexRequest = new IndexRequest(SAMPLE_MONITOR_RUNNER2_INDEX) + .source(Map.of(remoteMonitorInput.getB().name(), remoteMonitorInput.getB().getQueries().get(0).getQuery())) + .setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); + this.client.index(indexRequest); - return new MonitorRunResult<>( - monitor.getName(), - periodStart, - periodEnd, - null, - new InputRunResults(), - Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of())) - ); + return new MonitorRunResult<>( + monitor.getName(), + periodStart, + periodEnd, + null, + new InputRunResults(), + Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", null, Map.of())) + ); + } catch (Exception ex) { + return new MonitorRunResult<>( + monitor.getName(), + periodStart, + periodEnd, + ex, + new InputRunResults(), + Map.of("test-trigger", new SampleRemoteMonitorTriggerRunResult("test-trigger", ex, Map.of())) + ); + } } } \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTriggerRunResult.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/trigger/results/SampleRemoteMonitorTriggerRunResult.java similarity index 98% rename from sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTriggerRunResult.java rename to sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/trigger/results/SampleRemoteMonitorTriggerRunResult.java index 4aea8ebe0..80b0914bf 100644 --- a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTriggerRunResult.java +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/trigger/results/SampleRemoteMonitorTriggerRunResult.java @@ -3,7 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ -package org.opensearch.alerting.monitor.triggers; +package org.opensearch.alerting.monitor.trigger.results; import org.opensearch.commons.alerting.model.ActionRunResult; import org.opensearch.commons.alerting.model.TriggerRunResult; diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTrigger1.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTrigger1.java new file mode 100644 index 000000000..1f04e4a4e --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/monitor/triggers/SampleRemoteMonitorTrigger1.java @@ -0,0 +1,55 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.monitor.triggers; + +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; +import java.util.Map; + +public class SampleRemoteMonitorTrigger1 implements Writeable { + + private String a; + + private Map b; + + private int c; + + public SampleRemoteMonitorTrigger1(String a, Map b, int c) { + this.a = a; + this.b = b; + this.c = c; + } + + public SampleRemoteMonitorTrigger1(StreamInput sin) throws IOException { + this( + sin.readString(), + sin.readMap(), + sin.readInt() + ); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeString(a); + out.writeMap(b); + out.writeInt(c); + } + + public int getC() { + return c; + } + + public Map getB() { + return b; + } + + public String getA() { + return a; + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java index 0dfa49582..9f68f3069 100644 --- a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java +++ b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -68,7 +69,9 @@ public void testSingleSampleMonitor() throws IOException, InterruptedException { LoggingDeprecationHandler.INSTANCE, searchResponse.getEntity().getContent() ).map(); - found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1); + found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("hello") && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("hello").toString().equals("1")); return found.get(); } catch (IOException ex) { return false; @@ -126,12 +129,14 @@ public void testMultipleSampleMonitors() throws IOException, InterruptedExceptio LoggingDeprecationHandler.INSTANCE, searchResponse.getEntity().getContent() ).map(); - found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1); + found.set(Integer.parseInt((((Map) ((Map) searchResponseJson.get("hits")).get("total")).get("value")).toString()) == 1 && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).containsKey("doc_level_input") && + ((Map) ((List>) ((Map) searchResponseJson.get("hits")).get("hits")).get(0).get("_source")).get("doc_level_input").toString().equals("test:1")); return found.get(); } catch (IOException ex) { return false; } - }, 1, TimeUnit.SECONDS); + }, 10, TimeUnit.SECONDS); Assert.assertTrue(found.get()); }