From 46973ecf0e612357dba8f2fe41750ed74e8f52c9 Mon Sep 17 00:00:00 2001 From: Katherine Shen <40495707+shenkw1@users.noreply.github.com> Date: Fri, 13 Dec 2024 11:47:18 -0600 Subject: [PATCH 1/5] Schema revisions (#5260) * fix schemas Signed-off-by: Katherine Shen --- .../aggregate/AggregateProcessorConfig.java | 2 +- .../processor/LambdaProcessorConfig.java | 27 +++++++++++++------ .../keyvalue/KeyValueProcessorConfig.java | 4 +-- .../otelmetrics/OTelMetricsSourceTest.java | 1 - 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java index bd08de94c6..e1522b23ed 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorConfig.java @@ -31,7 +31,7 @@ public class AggregateProcessorConfig { @JsonPropertyDescription("An unordered list by which to group events. Events with the same values as these keys are put into the same group. " + "If an event does not contain one of the identification_keys, then the value of that key is considered to be equal to null. " + - "At least one identification_key is required. And example configuration is [\"sourceIp\", \"destinationIp\", \"port\"].") + "At least one identification_key is required. An example configuration is [\"sourceIp\", \"destinationIp\", \"port\"].") @JsonProperty("identification_keys") @NotEmpty @ExampleValues({ diff --git a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java index 5d7e05d7c0..cfa7417896 100644 --- a/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java +++ b/data-prepper-plugins/aws-lambda/src/main/java/org/opensearch/dataprepper/plugins/lambda/processor/LambdaProcessorConfig.java @@ -7,33 +7,44 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import org.opensearch.dataprepper.model.annotations.ExampleValues; +import org.opensearch.dataprepper.model.annotations.ExampleValues.Example; import java.util.Collections; import java.util.List; import org.opensearch.dataprepper.plugins.lambda.common.config.InvocationType; import org.opensearch.dataprepper.plugins.lambda.common.config.LambdaCommonConfig; +@JsonPropertyOrder +@JsonClassDescription("The aws_lambda processor enables invocation of an AWS Lambda function within your Data Prepper pipeline in order to process events." + + "It supports both synchronous and asynchronous invocations based on your use case.") public class LambdaProcessorConfig extends LambdaCommonConfig { + static final String DEFAULT_INVOCATION_TYPE = "request-response"; - @JsonPropertyDescription("invocation type defines the way we want to call lambda function") - @JsonProperty("invocation_type") + @JsonPropertyDescription("Specifies the invocation type, either request-response or event. Default is request-response.") + @JsonProperty(value = "invocation_type", defaultValue = DEFAULT_INVOCATION_TYPE) private InvocationType invocationType = InvocationType.REQUEST_RESPONSE; - @JsonPropertyDescription("Defines the way Data Prepper treats the response from Lambda") + @JsonPropertyDescription("Specifies how Data Prepper interprets and processes Lambda function responses. Default is false.") @JsonProperty("response_events_match") private boolean responseEventsMatch = false; - @JsonPropertyDescription("defines a condition for event to use this processor") - @JsonProperty("lambda_when") - private String whenCondition; - @JsonProperty("tags_on_failure") @JsonPropertyDescription( "A List of Strings that specifies the tags to be set in the event when lambda fails to " + "or exception occurs. This tag may be used in conditional expressions in " + - "other parts of the configuration") + "other parts of the configuration.") private List tagsOnFailure = Collections.emptyList(); + @JsonPropertyDescription("Defines a condition for event to use this processor.") + @ExampleValues({ + @Example(value = "event['status'] == 'process'", description = "The processor will only run on this condition.") + }) + @JsonProperty("lambda_when") + private String whenCondition; + public List getTagsOnFailure() { return tagsOnFailure; } diff --git a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java index 9c8edc7db3..501dbb9d54 100644 --- a/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java +++ b/data-prepper-plugins/key-value-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/keyvalue/KeyValueProcessorConfig.java @@ -124,13 +124,13 @@ public class KeyValueProcessorConfig { }) private Object nonMatchValue = DEFAULT_NON_MATCH_VALUE; - @JsonProperty(value = "include_keys", defaultValue = "[]") + @JsonProperty("include_keys") @JsonPropertyDescription("An array specifying the keys that should be included in the destination field. " + "By default, all keys will be added.") @NotNull private List includeKeys = DEFAULT_INCLUDE_KEYS; - @JsonProperty(value = "exclude_keys", defaultValue = "[]") + @JsonProperty("exclude_keys") @JsonPropertyDescription("An array specifying the parsed keys that should be excluded from the destination field. " + "By default, no keys will be excluded.") @NotNull diff --git a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java index f3342a612e..4e67c050f5 100644 --- a/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java +++ b/data-prepper-plugins/otel-metrics-source/src/test/java/org/opensearch/dataprepper/plugins/source/otelmetrics/OTelMetricsSourceTest.java @@ -36,7 +36,6 @@ import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceRequest; import io.opentelemetry.proto.collector.metrics.v1.ExportMetricsServiceResponse; import io.opentelemetry.proto.collector.metrics.v1.MetricsServiceGrpc; -import io.opentelemetry.proto.collector.trace.v1.TraceServiceGrpc; import io.opentelemetry.proto.metrics.v1.NumberDataPoint; import io.opentelemetry.proto.common.v1.InstrumentationLibrary; From a12471ca8921960aca84671d7dacf282843948b5 Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Fri, 13 Dec 2024 11:40:41 -0800 Subject: [PATCH 2/5] Add metrics to stream acknowledgement manager (#5256) * Add metrics to stream acknowledgement manager Signed-off-by: Souvik Bose * Fix the unit test failure Signed-off-by: Souvik Bose * Fix checkstyle Signed-off-by: Souvik Bose * Address comments Signed-off-by: Souvik Bose --------- Signed-off-by: Souvik Bose Co-authored-by: Souvik Bose --- .../stream/StreamAcknowledgementManager.java | 35 ++++++++++++-- .../plugins/mongo/stream/StreamScheduler.java | 2 +- .../StreamAcknowledgementManagerTest.java | 48 +++++++++++++++++-- 3 files changed, 77 insertions(+), 8 deletions(-) diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java index 7567a22786..38d4fc9794 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManager.java @@ -1,7 +1,9 @@ package org.opensearch.dataprepper.plugins.mongo.stream; import com.google.common.annotations.VisibleForTesting; +import io.micrometer.core.instrument.Counter; import org.opensearch.dataprepper.common.concurrent.BackgroundThreadFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.slf4j.Logger; @@ -16,6 +18,8 @@ import java.util.concurrent.Executors; import java.util.function.Consumer; +import static org.opensearch.dataprepper.logging.DataPrepperMarkers.NOISY; + public class StreamAcknowledgementManager { private static final Logger LOG = LoggerFactory.getLogger(StreamAcknowledgementManager.class); @@ -32,17 +36,35 @@ public class StreamAcknowledgementManager { private boolean enableAcknowledgement = false; + private final Counter positiveAcknowledgementSets; + private final Counter negativeAcknowledgementSets; + private final Counter recordsCheckpointed; + private final Counter noDataExtendLeaseCount; + private final Counter giveupPartitionCount; + public static final String POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "positiveAcknowledgementSets"; + public static final String NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME = "negativeAcknowledgementSets"; + public static final String RECORDS_CHECKPOINTED = "recordsCheckpointed"; + public static final String NO_DATA_EXTEND_LEASE_COUNT = "noDataExtendLeaseCount"; + public static final String GIVE_UP_PARTITION_COUNT = "giveupPartitionCount"; + + public StreamAcknowledgementManager(final AcknowledgementSetManager acknowledgementSetManager, final DataStreamPartitionCheckpoint partitionCheckpoint, final Duration partitionAcknowledgmentTimeout, final int acknowledgementMonitorWaitTimeInMs, - final int checkPointIntervalInMs) { + final int checkPointIntervalInMs, + final PluginMetrics pluginMetrics) { this.acknowledgementSetManager = acknowledgementSetManager; this.partitionCheckpoint = partitionCheckpoint; this.partitionAcknowledgmentTimeout = partitionAcknowledgmentTimeout; this.acknowledgementMonitorWaitTimeInMs = acknowledgementMonitorWaitTimeInMs; this.checkPointIntervalInMs = checkPointIntervalInMs; - executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor")); + this.executorService = Executors.newSingleThreadExecutor(BackgroundThreadFactory.defaultExecutorThreadFactory("mongodb-stream-ack-monitor")); + this.positiveAcknowledgementSets = pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME); + this.negativeAcknowledgementSets = pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME); + this.recordsCheckpointed = pluginMetrics.counter(RECORDS_CHECKPOINTED); + this.noDataExtendLeaseCount = pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT); + this.giveupPartitionCount = pluginMetrics.counter(GIVE_UP_PARTITION_COUNT); } void init(final Consumer stopWorkerConsumer) { @@ -79,17 +101,19 @@ private void monitorAcknowledgment(final ExecutorService executorService, final if (checkpointStatus.isNegativeAcknowledgement()) { // Give up partition and should interrupt parent thread to stop processing stream if (lastCheckpointStatus != null && lastCheckpointStatus.isPositiveAcknowledgement()) { - partitionCheckpoint.checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); + checkpoint(lastCheckpointStatus.getResumeToken(), lastCheckpointStatus.getRecordCount()); } LOG.warn("Acknowledgement not received for the checkpoint {} past wait time. Giving up partition.", checkpointStatus.getResumeToken()); partitionCheckpoint.giveUpPartition(); + this.giveupPartitionCount.increment(); break; } } } else { if (System.currentTimeMillis() - lastCheckpointTime >= checkPointIntervalInMs) { - LOG.debug("No records processed. Extend the lease of the partition worker."); + LOG.info(NOISY, "No records processed. Extend the lease of the partition worker."); partitionCheckpoint.extendLease(); + this.noDataExtendLeaseCount.increment(); lastCheckpointTime = System.currentTimeMillis(); } } @@ -111,6 +135,7 @@ private void monitorAcknowledgment(final ExecutorService executorService, final private void checkpoint(final String resumeToken, final long recordCount) { LOG.debug("Perform regular checkpointing for resume token {} at record count {}", resumeToken, recordCount); partitionCheckpoint.checkpoint(resumeToken, recordCount); + this.recordsCheckpointed.increment(recordCount); } Optional createAcknowledgementSet(final String resumeToken, final long recordNumber) { @@ -126,9 +151,11 @@ Optional createAcknowledgementSet(final String resumeToken, final CheckpointStatus ackCheckpointStatus = ackStatus.get(resumeToken); ackCheckpointStatus.setAcknowledgedTimestamp(Instant.now().toEpochMilli()); if (result) { + this.positiveAcknowledgementSets.increment(); ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.POSITIVE_ACK); LOG.debug("Received acknowledgment of completion from sink for checkpoint {}", resumeToken); } else { + this.negativeAcknowledgementSets.increment(); ackCheckpointStatus.setAcknowledged(CheckpointStatus.AcknowledgmentStatus.NEGATIVE_ACK); LOG.warn("Negative acknowledgment received for checkpoint {}, resetting checkpoint", resumeToken); // default CheckpointStatus acknowledged value is false. The monitorCheckpoints method will time out diff --git a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java index 82a3db975d..40a80bb71f 100644 --- a/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java +++ b/data-prepper-plugins/mongodb/src/main/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamScheduler.java @@ -109,7 +109,7 @@ public void run() { private StreamWorker getStreamWorker (final StreamPartition streamPartition) { final DataStreamPartitionCheckpoint partitionCheckpoint = new DataStreamPartitionCheckpoint(sourceCoordinator, streamPartition); final StreamAcknowledgementManager streamAcknowledgementManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, - sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS); + sourceConfig.getPartitionAcknowledgmentTimeout(), DEFAULT_MONITOR_WAIT_TIME_MS, DEFAULT_CHECKPOINT_INTERVAL_MILLS, pluginMetrics); final PartitionKeyRecordConverter recordConverter = getPartitionKeyRecordConverter(streamPartition); final CollectionConfig partitionCollectionConfig = sourceConfig.getCollections().stream() .filter(collectionConfig -> collectionConfig.getCollection().equals(streamPartition.getCollection())) diff --git a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java index 4e41008627..c515df0086 100644 --- a/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java +++ b/data-prepper-plugins/mongodb/src/test/java/org/opensearch/dataprepper/plugins/mongo/stream/StreamAcknowledgementManagerTest.java @@ -1,11 +1,13 @@ package org.opensearch.dataprepper.plugins.mongo.stream; +import io.micrometer.core.instrument.Counter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; @@ -22,11 +24,19 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.CoreMatchers.is; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.lenient; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.GIVE_UP_PARTITION_COUNT; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.NO_DATA_EXTEND_LEASE_COUNT; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME; +import static org.opensearch.dataprepper.plugins.mongo.stream.StreamAcknowledgementManager.RECORDS_CHECKPOINTED; @ExtendWith(MockitoExtension.class) public class StreamAcknowledgementManagerTest { @@ -41,11 +51,32 @@ public class StreamAcknowledgementManagerTest { private AcknowledgementSet acknowledgementSet; @Mock private Consumer stopWorkerConsumer; + @Mock + private PluginMetrics pluginMetrics; + @Mock + private Counter positiveAcknowledgementSets; + @Mock + private Counter negativeAcknowledgementSets; + @Mock + private Counter recordsCheckpointed; + @Mock + private Counter noDataExtendLeaseCount; + @Mock + private Counter giveupPartitionCount; + private StreamAcknowledgementManager streamAckManager; + + @BeforeEach public void setup() { - streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); + when(pluginMetrics.counter(POSITIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME)).thenReturn(positiveAcknowledgementSets); + when(pluginMetrics.counter(NEGATIVE_ACKNOWLEDGEMENT_SET_METRIC_NAME)).thenReturn(negativeAcknowledgementSets); + when(pluginMetrics.counter(RECORDS_CHECKPOINTED)).thenReturn(recordsCheckpointed); + when(pluginMetrics.counter(NO_DATA_EXTEND_LEASE_COUNT)).thenReturn(noDataExtendLeaseCount); + when(pluginMetrics.counter(GIVE_UP_PARTITION_COUNT)).thenReturn(giveupPartitionCount); + + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics); } @Test @@ -57,7 +88,7 @@ public void createAcknowledgementSet_disabled_emptyAckSet() { @Test public void createAcknowledgementSet_enabled_ackSetWithAck() { lenient().when(timeout.getSeconds()).thenReturn(10_000L); - streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics); streamAckManager.init(stopWorkerConsumer); final String resumeToken = UUID.randomUUID().toString(); final long recordCount = new Random().nextLong(); @@ -78,12 +109,15 @@ public void createAcknowledgementSet_enabled_ackSetWithAck() { .atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(partitionCheckpoint).checkpoint(resumeToken, recordCount)); assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue())); + verify(positiveAcknowledgementSets).increment(); + verifyNoInteractions(negativeAcknowledgementSets); + verify(recordsCheckpointed, atLeastOnce()).increment(anyDouble()); } @Test public void createAcknowledgementSet_enabled_multipleAckSetWithAck() { lenient().when(timeout.getSeconds()).thenReturn(10_000L); - streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0); + streamAckManager = new StreamAcknowledgementManager(acknowledgementSetManager, partitionCheckpoint, timeout, 0, 0, pluginMetrics); streamAckManager.init(stopWorkerConsumer); final String resumeToken1 = UUID.randomUUID().toString(); final long recordCount1 = new Random().nextLong(); @@ -114,6 +148,10 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAck() { .atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(partitionCheckpoint).checkpoint(resumeToken2, recordCount2)); assertThat(streamAckManager.getCheckpoints().peek(), is(nullValue())); + + verify(positiveAcknowledgementSets, atLeastOnce()).increment(); + verifyNoInteractions(negativeAcknowledgementSets); + verify(recordsCheckpointed, atLeastOnce()).increment(anyDouble()); } @Test @@ -149,6 +187,9 @@ public void createAcknowledgementSet_enabled_multipleAckSetWithAckFailure() { verify(partitionCheckpoint).giveUpPartition()); assertThat(streamAckManager.getCheckpoints().peek().getResumeToken(), is(resumeToken1)); assertThat(streamAckManager.getCheckpoints().peek().getRecordCount(), is(recordCount1)); + verify(positiveAcknowledgementSets).increment(); + verify(negativeAcknowledgementSets).increment(); + verify(giveupPartitionCount).increment(); verify(stopWorkerConsumer).accept(null); } @@ -173,5 +214,6 @@ public void createAcknowledgementSet_enabled_ackSetWithNoAck() { await() .atMost(Duration.ofSeconds(10)).untilAsserted(() -> verify(stopWorkerConsumer).accept(null)); + verify(negativeAcknowledgementSets).increment(); } } From 956a89a4bc2057b4f13ee424d27f62c4bdb6fc4a Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 13 Dec 2024 16:13:25 -0800 Subject: [PATCH 3/5] Fix failing data-prepper-expression code coverage verfication (#5265) Signed-off-by: Krishna Kondaka --- .../org/opensearch/dataprepper/expression/AndOperatorTest.java | 1 + .../org/opensearch/dataprepper/expression/EqualOperatorTest.java | 1 + .../dataprepper/expression/GenericNotOperatorTest.java | 1 + .../dataprepper/expression/GreaterThanOperatorTest.java | 1 + .../dataprepper/expression/GreaterThanOrEqualOperatorTest.java | 1 + .../opensearch/dataprepper/expression/LessThanOperatorTest.java | 1 + .../dataprepper/expression/LessThanOrEqualOperatorTest.java | 1 + .../opensearch/dataprepper/expression/NotInSetOperatorTest.java | 1 + .../org/opensearch/dataprepper/expression/NotOperatorTest.java | 1 + .../org/opensearch/dataprepper/expression/OrOperatorTest.java | 1 + .../dataprepper/expression/RegexEqualOperatorTest.java | 1 + .../dataprepper/expression/RegexNotEqualOperatorTest.java | 1 + .../opensearch/dataprepper/expression/TypeOfOperatorTest.java | 1 + 13 files changed, 13 insertions(+) diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/AndOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/AndOperatorTest.java index 0437b794ce..a6a526e669 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/AndOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/AndOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/EqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/EqualOperatorTest.java index a5fe621ff2..5f9f8caf19 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/EqualOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/EqualOperatorTest.java @@ -36,6 +36,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericNotOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericNotOperatorTest.java index c75b446a6c..cb0d55a817 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericNotOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GenericNotOperatorTest.java @@ -68,6 +68,7 @@ void testGivenMatchingRuleThenReturnTrue() { .getRuleIndex(); assertThat(genericNotOperator.shouldEvaluate(context), is(true)); + assertThat(genericNotOperator.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOperatorTest.java index dbf8023760..0f29e98ac2 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOrEqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOrEqualOperatorTest.java index 55c41b0ca9..8e434ea7de 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOrEqualOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/GreaterThanOrEqualOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOperatorTest.java index 7434161533..e87ac6aa4f 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOrEqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOrEqualOperatorTest.java index 94b52961a8..7a41ac66ea 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOrEqualOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/LessThanOrEqualOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotInSetOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotInSetOperatorTest.java index 40ef722bdb..3cc4e4f159 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotInSetOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotInSetOperatorTest.java @@ -38,6 +38,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotOperatorTest.java index 29fa4c8c3f..bcef789a5e 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/NotOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/OrOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/OrOperatorTest.java index 452939f6cc..b1dc2c293a 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/OrOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/OrOperatorTest.java @@ -35,6 +35,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java index bb92cd1e49..894eaa4638 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexEqualOperatorTest.java @@ -36,6 +36,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java index 30bc199413..052576a6d1 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/RegexNotEqualOperatorTest.java @@ -36,6 +36,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test diff --git a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/TypeOfOperatorTest.java b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/TypeOfOperatorTest.java index e7a0acb6f1..fa659b26ea 100644 --- a/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/TypeOfOperatorTest.java +++ b/data-prepper-expression/src/test/java/org/opensearch/dataprepper/expression/TypeOfOperatorTest.java @@ -44,6 +44,7 @@ void testShouldEvaluate() { assertThat(objectUnderTest.shouldEvaluate(ctx), is(true)); when(ctx.getRuleIndex()).thenReturn(-1); assertThat(objectUnderTest.shouldEvaluate(ctx), is(false)); + assertThat(objectUnderTest.isBooleanOperator(), is(true)); } @Test From 8bd39d3417b0dfef395d9d61a8b0ff721cd301ab Mon Sep 17 00:00:00 2001 From: Souvik Bose Date: Tue, 17 Dec 2024 10:10:05 -0800 Subject: [PATCH 4/5] Update expiration time based on ttl on the source coordinator item. (#5262) * Update expiration time based on ttl on the source coordinator item. Signed-off-by: Souvik Bose * Fix the tests for supporting expiration time Signed-off-by: Souvik Bose --------- Signed-off-by: Souvik Bose Co-authored-by: Souvik Bose --- .../dynamodb/DynamoDbClientWrapper.java | 8 +++-- .../DynamoDbSourceCoordinationStore.java | 6 ++-- .../dynamodb/DynamoDbClientWrapperTest.java | 33 +++++++++++++++---- .../DynamoDbSourceCoordinationStoreTest.java | 27 +++++++++------ 4 files changed, 53 insertions(+), 21 deletions(-) diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java index 9ed4973fc9..189fbf01f6 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapper.java @@ -237,7 +237,8 @@ public Optional getAvailablePartition(final String own final Duration ownershipTimeout, final SourcePartitionStatus sourcePartitionStatus, final String sourceStatusCombinationKey, - final int pageLimit) { + final int pageLimit, + final Duration ttl) { try { final DynamoDbIndex sourceStatusIndex = table.index(SOURCE_STATUS_COMBINATION_KEY_GLOBAL_SECONDARY_INDEX); @@ -273,8 +274,11 @@ public Optional getAvailablePartition(final String own item.setSourcePartitionStatus(SourcePartitionStatus.ASSIGNED); item.setSourceStatusCombinationKey(String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, item.getSourceIdentifier(), SourcePartitionStatus.ASSIGNED)); item.setPartitionPriority(partitionOwnershipTimeout.toString()); - final boolean acquired = this.tryAcquirePartitionItem(item); + if (Objects.nonNull(ttl)) { + item.setExpirationTime(Instant.now().plus(ttl).getEpochSecond()); + } + final boolean acquired = this.tryAcquirePartitionItem(item); if (acquired) { return Optional.of(item); } diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java index ffc10f4b64..25dde68499 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/main/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStore.java @@ -98,7 +98,7 @@ public boolean tryCreatePartitionItem(final String sourceIdentifier, public Optional tryAcquireAvailablePartition(final String sourceIdentifier, final String ownerId, final Duration ownershipTimeout) { final Optional acquiredAssignedItem = dynamoDbClientWrapper.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1); + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), 1, dynamoStoreSettings.getTtl()); if (acquiredAssignedItem.isPresent()) { return acquiredAssignedItem; @@ -106,7 +106,7 @@ public Optional tryAcquireAvailablePartition(final Str final Optional acquiredUnassignedItem = dynamoDbClientWrapper.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5); + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), 5, dynamoStoreSettings.getTtl()); if (acquiredUnassignedItem.isPresent()) { return acquiredUnassignedItem; @@ -114,7 +114,7 @@ public Optional tryAcquireAvailablePartition(final Str return dynamoDbClientWrapper.getAvailablePartition( ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, - String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1); + String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), 1, dynamoStoreSettings.getTtl()); } @Override diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java index 8740086539..d7c340c6a0 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbClientWrapperTest.java @@ -62,6 +62,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; @@ -464,8 +465,10 @@ void getAvailablePartition_with_no_items_from_query_returns_empty_optional(final final int pageLimit = new Random().nextInt(20); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, pageLimit, ttl); assertThat(result.isEmpty(), equalTo(true)); @@ -513,8 +516,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest(); reflectivelySetField(objectUnderTest, "table", table); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); assertThat(result.isPresent(), equalTo(true)); assertThat(result.get(), equalTo(acquiredItem)); @@ -531,6 +536,10 @@ void getAvailablePartition_will_continue_until_tryAcquirePartition_succeeds(fina assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true)); + final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); + verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); } @@ -574,8 +583,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest(); reflectivelySetField(objectUnderTest, "table", table); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); assertThat(result.isPresent(), equalTo(true)); assertThat(result.get(), equalTo(acquiredItem)); @@ -593,6 +604,10 @@ void getAvailablePartition_with_multiple_pages_continue_until_tryAcquirePartitio assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true)); verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); + + final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); } @ParameterizedTest @@ -635,8 +650,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi final DynamoDbClientWrapper objectUnderTest = createObjectUnderTest(); reflectivelySetField(objectUnderTest, "table", table); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.valueOf(sourcePartitionStatus), sourceStatusCombinationKey, new Random().nextInt(20), ttl); assertThat(result.isEmpty(), equalTo(true)); @@ -653,6 +670,10 @@ void getAvailablePartition_with_multiple_pages_will_iterate_through_all_items_wi assertThat(newPartitionOwnershipTimeout.isAfter(now.plus(ownershipTimeout)), equalTo(true)); verify(acquiredItem).setPartitionPriority(newPartitionOwnershipTimeout.toString()); + + final ArgumentCaptor expiryTimeArgumentCaptor = ArgumentCaptor.forClass(Long.class); + verify(acquiredItem).setExpirationTime(expiryTimeArgumentCaptor.capture()); + assertThat(expiryTimeArgumentCaptor.getValue(), greaterThan(Instant.now().getEpochSecond())); } @Test @@ -681,7 +702,7 @@ void getAvailablePartition_with_assigned_partition_with_unexpired_partitionOwner reflectivelySetField(objectUnderTest, "table", table); final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt())); assertThat(result.isEmpty(), equalTo(true)); @@ -716,7 +737,7 @@ void getAvailablePartition_with_closed_partition_with_unreached_reOpenAt_time_re reflectivelySetField(objectUnderTest, "table", table); final Optional result = objectUnderTest.getAvailablePartition( - ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20)); + ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, sourceStatusCombinationKey, new Random().nextInt(20), Duration.ofSeconds(new Random().nextInt())); assertThat(result.isEmpty(), equalTo(true)); diff --git a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java index 9730d028ab..2150fa55c8 100644 --- a/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java +++ b/data-prepper-plugins/dynamodb-source-coordination-store/src/test/java/org/opensearch/dataprepper/plugins/sourcecoordinator/dynamodb/DynamoDbSourceCoordinationStoreTest.java @@ -245,21 +245,23 @@ void getAvailablePartition_with_no_item_acquired_returns_empty_optional() { final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), - 5)) + 5, ttl)) .willReturn(Optional.empty()); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); @@ -272,13 +274,14 @@ void getAvailablePartition_with_acquired_ASSIGNED_partition_returns_the_partitio final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); - + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.of(acquiredItem)); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); @@ -294,23 +297,25 @@ void getAvailablePartition_with_acquired_CLOSED_partition_returns_the_partition( final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), - 5)) + 5, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.CLOSED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.CLOSED), - 1)) + 1, ttl)) .willReturn(Optional.of(acquiredItem)); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); @@ -326,18 +331,20 @@ void getAvailablePartition_with_acquired_UNASSIGNED_partition_returns_the_partit final String ownerId = UUID.randomUUID().toString(); final String sourceIdentifier = UUID.randomUUID().toString(); final Duration ownershipTimeout = Duration.ofMinutes(2); + final Duration ttl = Duration.ofSeconds(new Random().nextInt(5)+10); + given(dynamoStoreSettings.getTtl()).willReturn(ttl); final DynamoDbSourcePartitionItem acquiredItem = mock(DynamoDbSourcePartitionItem.class); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.ASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.ASSIGNED), - 1)) + 1, ttl)) .willReturn(Optional.empty()); given(dynamoDbClientWrapper.getAvailablePartition(ownerId, ownershipTimeout, SourcePartitionStatus.UNASSIGNED, String.format(SOURCE_STATUS_COMBINATION_KEY_FORMAT, sourceIdentifier, SourcePartitionStatus.UNASSIGNED), - 5)) + 5, ttl)) .willReturn(Optional.of(acquiredItem)); final Optional result = createObjectUnderTest().tryAcquireAvailablePartition(sourceIdentifier, ownerId, ownershipTimeout); From 708843c2cc397ba01e02b60e8baaaf262d5dd561 Mon Sep 17 00:00:00 2001 From: Krishna Kondaka Date: Fri, 20 Dec 2024 09:17:53 -0800 Subject: [PATCH 5/5] Add acknowledgement support to aggregate processor (#5139) * Addressed review comments Signed-off-by: Krishna Kondaka * Addressed review comments and added tests Signed-off-by: Krishna Kondaka * Fixed checkstyle errors Signed-off-by: Krishna Kondaka * Fixed test errors by adding await Signed-off-by: Krishna Kondaka * Addressed review comments Signed-off-by: Krishna Kondaka * Removed unnecessary API Signed-off-by: Krishna Kondaka * Fixed checkstyle error Signed-off-by: Krishna Kondaka --------- Signed-off-by: Krishna Kondaka --- .../dataprepper/model/event/JacksonEvent.java | 2 +- .../model/processor/Processor.java | 9 + .../model/processor/ProcessorTest.java | 23 + .../dataprepper/core/pipeline/Pipeline.java | 2 +- .../core/pipeline/ProcessWorker.java | 3 +- .../core/pipeline/ProcessWorkerTest.java | 57 ++ .../aggregate-processor/build.gradle | 1 + .../processor/aggregate/AggregateAction.java | 15 + .../aggregate/AggregateActionInput.java | 8 + .../processor/aggregate/AggregateGroup.java | 20 + .../aggregate/AggregateGroupManager.java | 1 + .../aggregate/AggregateProcessor.java | 13 +- .../actions/AppendAggregateAction.java | 1 + .../actions/CountAggregateAction.java | 2 + .../actions/HistogramAggregateAction.java | 2 + .../PercentSamplerAggregateAction.java | 1 + .../actions/PutAllAggregateAction.java | 1 + .../actions/RateLimiterAggregateAction.java | 1 + .../actions/TailSamplerAggregateAction.java | 19 +- .../aggregate/AggregateActionTestUtils.java | 12 + .../aggregate/AggregateProcessorIT.java | 84 ++- .../AggregateProcessorITWithAcks.java | 621 ++++++++++++++++++ 22 files changed, 874 insertions(+), 24 deletions(-) create mode 100644 data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java create mode 100644 data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java index 47de571672..09a0705e0e 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/event/JacksonEvent.java @@ -587,7 +587,7 @@ public Builder withTimeReceived(final Instant timeReceived) { * @return returns the builder * @since 2.10 */ - protected Builder withEventHandle(final EventHandle eventHandle) { + public Builder withEventHandle(final EventHandle eventHandle) { this.eventHandle = eventHandle; return this; } diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java index 551aed3d01..784758fa95 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/processor/Processor.java @@ -33,6 +33,15 @@ public interface Processor, OutputRecord extends R */ void prepareForShutdown(); + /** + * @since 2.11 + * Indicates if the processor holds the events or not + * Holding events indicates that the events are not ready to be released. + */ + default boolean holdsEvents() { + return false; + } + /** * @since 1.2 * Returns true if the Processor's internal state is safe to be shutdown. diff --git a/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java new file mode 100644 index 0000000000..2fec941c4f --- /dev/null +++ b/data-prepper-api/src/test/java/org/opensearch/dataprepper/model/processor/ProcessorTest.java @@ -0,0 +1,23 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.model.processor; + +import org.junit.jupiter.api.Test; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class ProcessorTest { + + @Test + public void testDefault() { + Processor processor = mock(Processor.class); + when(processor.holdsEvents()).thenCallRealMethod(); + assertThat(processor.holdsEvents(), equalTo(false)); + } +} + diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java index b5d3f812cf..f1a247b717 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/Pipeline.java @@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina * @param records records that needs to published to each sink * @return List of Future, each future for each sink */ - List> publishToSinks(final Collection records) { + public List> publishToSinks(final Collection records) { final int sinksSize = sinks.size(); final List> sinkFutures = new ArrayList<>(sinksSize); diff --git a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java index e313430b49..8fb314fd83 100644 --- a/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java +++ b/data-prepper-core/src/main/java/org/opensearch/dataprepper/core/pipeline/ProcessWorker.java @@ -137,7 +137,8 @@ private void doRun() { try { records = processor.execute(records); - if (inputEvents != null) { + // acknowledge missing events only if the processor is not holding events + if (!processor.holdsEvents() && inputEvents != null) { processAcknowledgements(inputEvents, records); } } catch (final Exception e) { diff --git a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java index 230bb32780..1eb28f991b 100644 --- a/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java +++ b/data-prepper-core/src/test/java/org/opensearch/dataprepper/core/pipeline/ProcessWorkerTest.java @@ -6,6 +6,8 @@ import org.mockito.Mock; import org.mockito.MockedStatic; import org.mockito.junit.jupiter.MockitoExtension; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.atLeast; import org.opensearch.dataprepper.core.pipeline.common.FutureHelper; import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult; import org.opensearch.dataprepper.model.CheckpointState; @@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() { } } + @Test + void testProcessWorkerWithProcessorsNotHoldingEvents() { + DefaultEventHandle eventHandle = mock(DefaultEventHandle.class); + Event event = mock(Event.class); + Record record = mock(Record.class); + when(eventHandle.release(true)).thenReturn(true); + lenient().when(event.getEventHandle()).thenReturn(eventHandle); + when(record.getData()).thenReturn(event); + final List records = List.of(record); + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor1 = mock(Processor.class); + when(processor1.holdsEvents()).thenReturn(false); + when(processor1.execute(records)).thenReturn(List.of()); + when(processor1.isReadyForShutdown()).thenReturn(true); + processors = List.of(processor1); + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + + verify(eventHandle, atLeast(1)).release(true); + } + + + @Test + void testProcessWorkerWithProcessorsHoldingEvents() { + EventHandle eventHandle = mock(EventHandle.class); + Event event = mock(Event.class); + Record record = mock(Record.class); + lenient().when(event.getEventHandle()).thenReturn(eventHandle); + when(record.getData()).thenReturn(event); + final List records = List.of(record); + final CheckpointState checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult); + + final Processor processor1 = mock(Processor.class); + when(processor1.holdsEvents()).thenReturn(true); + when(processor1.execute(records)).thenReturn(List.of()); + when(processor1.isReadyForShutdown()).thenReturn(true); + + processors = List.of(processor1); + when(source.areAcknowledgementsEnabled()).thenReturn(true); + + final ProcessWorker processWorker = createObjectUnderTest(); + + processWorker.run(); + + verify(eventHandle, never()).release(true); + } + @Test void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() { diff --git a/data-prepper-plugins/aggregate-processor/build.gradle b/data-prepper-plugins/aggregate-processor/build.gradle index 9a3eb4551a..bc2f398b4b 100644 --- a/data-prepper-plugins/aggregate-processor/build.gradle +++ b/data-prepper-plugins/aggregate-processor/build.gradle @@ -14,6 +14,7 @@ dependencies { implementation project(':data-prepper-expression') implementation project(':data-prepper-plugins:otel-proto-common') implementation project(':data-prepper-plugins:otel-metrics-raw-processor') + testImplementation project(':data-prepper-core') implementation libs.guava.core implementation libs.commons.lang3 implementation libs.opentelemetry.proto diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java index ae798af032..541cd15d3d 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateAction.java @@ -6,6 +6,7 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import java.util.Collections; @@ -29,6 +30,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc return AggregateActionResponse.fromEvent(event); } + /** + * indicates if the action holds the events or not + * + */ + default boolean holdsEvents() { + return false; + } + /** * Concludes a group of Events * @@ -38,6 +47,12 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc * @since 1.3 */ default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { + if (aggregateActionInput != null) { + EventHandle eventHandle = aggregateActionInput.getEventHandle(); + if (eventHandle != null) { + eventHandle.release(true); + } + } return new AggregateActionOutput(Collections.emptyList()); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java index 0bec0b2350..cd7b47d66e 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionInput.java @@ -5,6 +5,8 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.EventHandle; + import java.util.Map; import java.util.function.Function; import java.time.Duration; @@ -28,6 +30,12 @@ public interface AggregateActionInput { */ Map getIdentificationKeys(); + /** + * @return returns eventHandle held by the instance + * @since 2.11 + */ + EventHandle getEventHandle(); + /** * Sets custom shouldConclude function * diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java index 09e0e97223..14adde221f 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroup.java @@ -5,6 +5,11 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.event.InternalEventHandle; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.Event; + import java.time.Duration; import java.time.Instant; import java.util.function.Function; @@ -19,6 +24,7 @@ class AggregateGroup implements AggregateActionInput { private final Lock handleEventForGroupLock; private final Map identificationKeys; private Function customShouldConclude; + private EventHandle eventHandle; AggregateGroup(final Map identificationKeys) { this.groupState = new DefaultGroupState(); @@ -26,6 +32,19 @@ class AggregateGroup implements AggregateActionInput { this.groupStart = Instant.now(); this.concludeGroupLock = new ReentrantLock(); this.handleEventForGroupLock = new ReentrantLock(); + this.eventHandle = new AggregateEventHandle(Instant.now()); + } + + @Override + public EventHandle getEventHandle() { + return eventHandle; + } + + public void attachToEventAcknowledgementSet(Event event) { + InternalEventHandle internalEventHandle; + EventHandle handle = event.getEventHandle(); + internalEventHandle = (InternalEventHandle)(handle); + internalEventHandle.addEventHandle(eventHandle); } public GroupState getGroupState() { @@ -63,5 +82,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) { void resetGroup() { groupStart = Instant.now(); groupState.clear(); + this.eventHandle = new AggregateEventHandle(groupStart); } } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java index 9d271aa40b..dedf1edde0 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateGroupManager.java @@ -26,6 +26,7 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap())); } + List> getGroupsToConclude(final boolean forceConclude) { final List> groupsToConclude = new ArrayList<>(); for (final Map.Entry groupEntry : allGroups.entrySet()) { diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java index 2b19e98516..616c3c5ea8 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessor.java @@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) { return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting); } + AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) { + AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap); + aggregateGroup.attachToEventAcknowledgementSet(event); + return aggregateGroup; + } + @Override public Collection> doExecute(Collection> records) { final List> recordsOut = new LinkedList<>(); @@ -124,7 +130,7 @@ public Collection> doExecute(Collection> records) { continue; } final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event); - final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap); + final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event); final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent); @@ -149,6 +155,11 @@ public Collection> doExecute(Collection> records) { return recordsOut; } + @Override + public boolean holdsEvents() { + return aggregateAction.holdsEvents(); + } + public static long getTimeNanos(final Instant time) { final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000; long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano(); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java index ece5212ac4..0d930b94ba 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/AppendAggregateAction.java @@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA final Event event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(aggregateActionInput.getGroupState()) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); return new AggregateActionOutput(List.of(event)); } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java index 8b67ca64cd..16bbf39c31 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/CountAggregateAction.java @@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(groupState) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); } else { Integer countValue = (Integer)groupState.get(countKey); @@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withValue((double)countValue) .withExemplars(List.of(exemplar)) .withAttributes(attr) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(false); event = (Event)sum; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java index 22cfa7efb7..ac1a59a712 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/HistogramAggregateAction.java @@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(groupState) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); } else { List explicitBoundsList = new ArrayList(); @@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA .withExplicitBoundsList(explicitBoundsList) .withExemplars(exemplarList) .withAttributes(attr) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(false); event = (Event)histogram; } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java index b243dd5ef0..9b27a49dee 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PercentSamplerAggregateAction.java @@ -47,4 +47,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct } return AggregateActionResponse.nullEventResponse(); } + } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java index 78debabb35..54e0e2c72c 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/PutAllAggregateAction.java @@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA final Event event = JacksonEvent.builder() .withEventType(EVENT_TYPE) .withData(aggregateActionInput.getGroupState()) + .withEventHandle(aggregateActionInput.getEventHandle()) .build(); return new AggregateActionOutput(List.of(event)); diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java index 3ea0d0b8af..ce8131b95c 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/RateLimiterAggregateAction.java @@ -42,4 +42,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct } return new AggregateActionResponse(event); } + } diff --git a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java index 26b245da73..fc347e0105 100644 --- a/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java +++ b/data-prepper-plugins/aggregate-processor/src/main/java/org/opensearch/dataprepper/plugins/processor/aggregate/actions/TailSamplerAggregateAction.java @@ -8,6 +8,7 @@ import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateAction; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionInput; import org.opensearch.dataprepper.plugins.processor.aggregate.AggregateActionOutput; @@ -15,8 +16,9 @@ import org.opensearch.dataprepper.plugins.processor.aggregate.GroupState; import org.opensearch.dataprepper.expression.ExpressionEvaluator; -import java.util.List; import java.util.ArrayList; +import java.util.Collections; +import java.util.List; import java.util.Random; import java.time.Duration; import java.time.Instant; @@ -70,14 +72,27 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct return AggregateActionResponse.nullEventResponse(); } + @Override + public boolean holdsEvents() { + return true; + } + @Override public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) { GroupState groupState = aggregateActionInput.getGroupState(); int randomInt = random.nextInt(100); + aggregateActionInput.getEventHandle().release(true); if (((groupState.containsKey(ERROR_STATUS_KEY) && (Boolean)groupState.get(ERROR_STATUS_KEY) == true)) || (randomInt < percent)) { return new AggregateActionOutput((List)groupState.getOrDefault(EVENTS_KEY, List.of())); } - return new AggregateActionOutput(List.of()); + List events = (List)groupState.getOrDefault(EVENTS_KEY, List.of()); + for (final Event event : events) { + EventHandle eventHandle = event.getEventHandle(); + if (eventHandle != null) { + eventHandle.release(true); + } + } + return new AggregateActionOutput(Collections.emptyList()); } } diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java index 21e49e05be..b46d2bdaab 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateActionTestUtils.java @@ -5,6 +5,11 @@ package org.opensearch.dataprepper.plugins.processor.aggregate; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.event.EventHandle; + +import java.time.Duration; +import java.time.Instant; import java.util.Map; import java.util.HashMap; import java.time.Duration; @@ -15,10 +20,12 @@ public static class TestAggregateActionInput implements AggregateActionInput { private final GroupState groupState; private final Map identificationKeys; private Function customShouldConclude; + private EventHandle eventHandle; public TestAggregateActionInput(Map identificationKeys) { this.groupState = new AggregateActionTestUtils.TestGroupState(); this.identificationKeys = identificationKeys; + this.eventHandle = new AggregateEventHandle(Instant.now()); } @Override @@ -31,6 +38,11 @@ public GroupState getGroupState() { return groupState; } + @Override + public EventHandle getEventHandle() { + return eventHandle; + } + @Override public Map getIdentificationKeys() { return identificationKeys; diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java index a7608decec..0a5dbd6117 100644 --- a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorIT.java @@ -11,9 +11,14 @@ import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventHandle; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.event.JacksonEvent; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSet; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetMetrics; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.extension.ExtendWith; @@ -52,6 +57,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadLocalRandom; @@ -105,6 +111,7 @@ public class AggregateProcessorIT { private Collection> eventBatch; private ConcurrentLinkedQueue> aggregatedResult; private Set> uniqueEventMaps; + private Set eventHandles; @Mock private PluginFactory pluginFactory; @@ -114,6 +121,7 @@ public class AggregateProcessorIT { @BeforeEach void setup() { + eventHandles = new HashSet<>(); aggregatedResult = new ConcurrentLinkedQueue<>(); uniqueEventMaps = new HashSet<>(); @@ -186,7 +194,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -213,7 +221,7 @@ void aggregateWithConcludingGroupsOnceReturnsExpectedResult() throws Interrupted } } - @RepeatedTest(value = 2) + @RepeatedTest(value = 1) void aggregateWithPutAllActionAndCondition() throws InterruptedException { aggregateAction = new PutAllAggregateAction(); when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) @@ -222,9 +230,12 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); when(aggregateProcessorConfig.getWhenCondition()).thenReturn(condition); when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); int count = 0; for (Record record: eventBatch) { Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); boolean value = (count % 2 == 0) ? true : false; when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(value); if (!value) { @@ -238,8 +249,9 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); + final List> allRecordsOut = new ArrayList<>(); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { final List> recordsOut = (List>) objectUnderTest.doExecute(eventBatch); @@ -247,6 +259,7 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { final Map map = record.getData().toMap(); aggregatedResult.add(map); } + allRecordsOut.addAll(recordsOut); countDownLatch.countDown(); }); } @@ -259,6 +272,11 @@ void aggregateWithPutAllActionAndCondition() throws InterruptedException { for (final Map uniqueEventMap : uniqueEventMaps) { assertThat(aggregatedResult, hasItem(uniqueEventMap)); } + for (Record record: allRecordsOut) { + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); + } } @ParameterizedTest @@ -276,7 +294,7 @@ void aggregateWithPercentSamplerAction(double testPercent) throws InterruptedExc final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); AtomicInteger allowedEventsCount = new AtomicInteger(0); for (int i = 0; i < NUM_THREADS; i++) { @@ -309,7 +327,7 @@ void aggregateWithRateLimiterAction() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -344,7 +362,7 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); for (int i = 0; i < NUM_THREADS; i++) { executorService.execute(() -> { @@ -364,8 +382,8 @@ void aggregateWithRateLimiterActionNoDrops() throws InterruptedException { assertThat(aggregatedResult.size(), equalTo(NUM_THREADS * NUM_EVENTS_PER_BATCH)); } - @RepeatedTest(value = 2) - void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + @RepeatedTest(value = 1) + void aggregateWithCountAggregateActionKK() throws InterruptedException, NoSuchFieldException, IllegalAccessException { CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); @@ -373,6 +391,12 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel .thenReturn(aggregateAction); when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); eventBatch = getBatchOfEvents(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + for (Record record: eventBatch) { + Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); + } final AggregateProcessor objectUnderTest = createObjectUnderTest(); @@ -385,7 +409,7 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); @@ -399,10 +423,13 @@ void aggregateWithCountAggregateAction() throws InterruptedException, NoSuchFiel final Record record = (Record)results.toArray()[0]; expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v))); assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY)); + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @RepeatedTest(value = 2) - void aggregateWithCountAggregateActionWithCondition() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + void aggregateWithCountAggregateActionWithConditionPP() throws InterruptedException, NoSuchFieldException, IllegalAccessException { CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); aggregateAction = new CountAggregateAction(countAggregateActionConfig); @@ -414,16 +441,19 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio when(expressionEvaluator.isValidExpressionStatement(condition)).thenReturn(true); int count = 0; eventBatch = getBatchOfEvents(true); + + final AggregateProcessor objectUnderTest = createObjectUnderTest(); + + final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); for (Record record: eventBatch) { Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); boolean value = (count % 2 == 0) ? true : false; when(expressionEvaluator.evaluateConditional(condition, event)).thenReturn(value); count++; } - - final AggregateProcessor objectUnderTest = createObjectUnderTest(); - - final ExecutorService executorService = Executors.newFixedThreadPool(NUM_THREADS); final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { @@ -432,7 +462,7 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); @@ -446,6 +476,9 @@ void aggregateWithCountAggregateActionWithCondition() throws InterruptedExceptio final Record record = (Record)results.toArray()[0]; expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v))); assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY)); + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @RepeatedTest(value = 2) @@ -460,6 +493,12 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte .thenReturn(aggregateAction); when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); eventBatch = getBatchOfEvents(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + for (Record record: eventBatch) { + Event event = record.getData(); + acknowledgementSet.add(event.getEventHandle()); + } final AggregateProcessor objectUnderTest = createObjectUnderTest(); @@ -474,7 +513,7 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte }); } // wait longer so that the raw events are processed. - Thread.sleep(2*GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(2*GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); @@ -489,6 +528,9 @@ void aggregateWithCountAggregateActionWithUnaggregatedEvents() throws Interrupte assertTrue(record.getData().getMetadata().hasTags(List.of(tag))); expectedEventMap.forEach((k, v) -> assertThat(record.getData().toMap(), hasEntry(k,v))); assertThat(record.getData().toMap(), hasKey(DEFAULT_START_TIME_KEY)); + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @@ -518,10 +560,13 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch .thenReturn(aggregateAction); when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); eventBatch = getBatchOfEvents(true); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + AcknowledgementSet acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); for (final Record record : eventBatch) { final double value = ThreadLocalRandom.current().nextDouble(TEST_VALUE_RANGE_MIN-TEST_VALUE_RANGE_STEP, TEST_VALUE_RANGE_MAX+TEST_VALUE_RANGE_STEP); Event event = record.getData(); event.put(testKey, value); + acknowledgementSet.add(event.getEventHandle()); } final AggregateProcessor objectUnderTest = createObjectUnderTest(); @@ -556,6 +601,9 @@ void aggregateWithHistogramAggregateAction() throws InterruptedException, NoSuch for (int i = 0; i < testBuckets.size(); i++) { assertThat(testBuckets.get(i).doubleValue(), equalTo(bucketsInResult.get(i))); } + EventHandle eventHandle = record.getData().getEventHandle(); + assertTrue(eventHandle instanceof AggregateEventHandle); + assertTrue(((AggregateEventHandle)eventHandle).hasAcknowledgementSet()); } @ParameterizedTest @@ -581,7 +629,7 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc final int numberOfSpans = 5; eventBatch = getBatchOfEventsForTailSampling(numberOfErrorTraces, numberOfSpans); objectUnderTest.doExecute(eventBatch); - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); final CountDownLatch countDownLatch = new CountDownLatch(NUM_THREADS); for (int i = 0; i < NUM_THREADS; i++) { @@ -590,7 +638,7 @@ void aggregateWithTailSamplerAction(final int testPercent) throws InterruptedExc countDownLatch.countDown(); }); } - Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 1000); + Thread.sleep(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE * 2000); boolean allThreadsFinished = countDownLatch.await(5L, TimeUnit.SECONDS); assertThat(allThreadsFinished, equalTo(true)); List errorEventList = eventBatch.stream().map(Record::getData).filter(event -> { diff --git a/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java new file mode 100644 index 0000000000..1534c65de3 --- /dev/null +++ b/data-prepper-plugins/aggregate-processor/src/test/java/org/opensearch/dataprepper/plugins/processor/aggregate/AggregateProcessorITWithAcks.java @@ -0,0 +1,621 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.aggregate; + +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; +import org.opensearch.dataprepper.model.buffer.Buffer; +import static org.opensearch.dataprepper.test.helper.ReflectivelySetField.setField; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.plugin.PluginFactory; +import org.opensearch.dataprepper.model.event.AggregateEventHandle; +import org.opensearch.dataprepper.model.CheckpointState; +import org.opensearch.dataprepper.core.pipeline.common.FutureHelper; +import org.opensearch.dataprepper.core.pipeline.Pipeline; +import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSet; +import org.opensearch.dataprepper.core.acknowledgements.DefaultAcknowledgementSetMetrics; +import org.opensearch.dataprepper.core.pipeline.ProcessWorker; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.AppendAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.AppendAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.CountAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.HistogramAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.HistogramAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterMode; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RateLimiterAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PercentSamplerAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PercentSamplerAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.TailSamplerAggregateActionConfig; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.TailSamplerAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.RemoveDuplicatesAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.PutAllAggregateAction; +import org.opensearch.dataprepper.plugins.processor.aggregate.actions.OutputFormat; +import org.opensearch.dataprepper.model.processor.Processor; +import org.opensearch.dataprepper.model.event.JacksonEvent; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.model.source.Source; + +import static org.awaitility.Awaitility.await; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.hamcrest.MatcherAssert.assertThat; +import org.mockito.Mock; +import org.mockito.MockedStatic; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.lessThan; + + +import org.apache.commons.lang3.RandomStringUtils; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.Executors; + + +public class AggregateProcessorITWithAcks { + private static final int testValue = 1; + private static final int GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE = 2; + private static final int NUM_UNIQUE_EVENTS_PER_BATCH = 8; + private static final int NUM_EVENTS_PER_BATCH = 5; + private static final Duration TEST_TIMEOUT = Duration.ofSeconds(5); + + @Mock + private Pipeline pipeline; + @Mock + private Buffer buffer; + @Mock + private Source source; + @Mock + private PluginFactory pluginFactory; + @Mock + private ExpressionEvaluator expressionEvaluator; + @Mock + private CheckpointState checkpointState; + @Mock + private PluginModel actionConfiguration; + @Mock + private AggregateProcessorConfig aggregateProcessorConfig; + private int callCount; + private boolean aggregatedResultReceived; + List> records; + private String testKey; + + private PluginMetrics pluginMetrics; + private List processors; + private List> sinkFutures; + AcknowledgementSet acknowledgementSet; + ScheduledExecutorService scheduledExecutorService; + List> aggregatedResults; + + @BeforeEach + void setup() { + testKey = UUID.randomUUID().toString(); + pluginMetrics = PluginMetrics.fromNames(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + scheduledExecutorService = Executors.newScheduledThreadPool(3); + acknowledgementSet = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + final List identificationKeys = new ArrayList<>(); + identificationKeys.add("firstRandomNumber"); + identificationKeys.add("secondRandomNumber"); + identificationKeys.add("thirdRandomNumber"); + callCount = 0; + aggregatedResultReceived = false; + aggregatedResults = new ArrayList<>(); + + pipeline = mock(Pipeline.class); + source = mock(Source.class); + buffer = mock(Buffer.class); + processors = List.of(); + aggregateProcessorConfig = mock(AggregateProcessorConfig.class); + actionConfiguration = mock(PluginModel.class); + pluginFactory = mock(PluginFactory.class); + expressionEvaluator = mock(ExpressionEvaluator.class); + when(pipeline.isStopRequested()).thenReturn(false).thenReturn(true); + when(source.areAcknowledgementsEnabled()).thenReturn(true); + when(pipeline.getSource()).thenReturn(source); + when(buffer.isEmpty()).thenReturn(true); + when(pipeline.getPeerForwarderDrainTimeout()).thenReturn(Duration.ofMillis(100)); + when(pipeline.getReadBatchTimeoutInMillis()).thenReturn(500); + when(aggregateProcessorConfig.getOutputUnaggregatedEvents()).thenReturn(false); + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(identificationKeys); + when(aggregateProcessorConfig.getWhenCondition()).thenReturn(null); + + records = getRecords(testKey, testValue, acknowledgementSet); + acknowledgementSet.complete(); + checkpointState = mock(CheckpointState.class); + final Map.Entry readResult = Map.entry(records, checkpointState); + + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenAnswer(a -> { + if (callCount == 0) { + callCount++; + return Map.entry(records, checkpointState); + } else { + return Map.entry(List.of(), checkpointState); + } + }); + + final Future sinkFuture = mock(Future.class); + sinkFutures = List.of(sinkFuture); + doAnswer( a -> { + List> receivedRecords = (List>)a.getArgument(0); + if (receivedRecords.size() > 0) { + aggregatedResults = receivedRecords; + for (Record record: receivedRecords) { + if (record.getData().getEventHandle() instanceof AggregateEventHandle) { + aggregatedResultReceived = true; + } + record.getData().getEventHandle().release(true); + } + } + + return sinkFutures; + }).when(pipeline).publishToSinks(any()); + when(aggregateProcessorConfig.getAggregateAction()).thenReturn(actionConfiguration); + when(actionConfiguration.getPluginName()).thenReturn(UUID.randomUUID().toString()); + when(actionConfiguration.getPluginSettings()).thenReturn(Collections.emptyMap()); + } + + @Test + public void testHistogramAggregation() throws Exception { + HistogramAggregateActionConfig histogramAggregateActionConfig = new HistogramAggregateActionConfig(); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "outputFormat", OutputFormat.RAW); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "key", testKey); + final String testKeyPrefix = RandomStringUtils.randomAlphabetic(4)+"_"; + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "generatedKeyPrefix", testKeyPrefix); + final String testUnits = RandomStringUtils.randomAlphabetic(3); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "units", testUnits); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "recordMinMax", true); + List testBuckets = new ArrayList(); + testBuckets.add(10.0); + setField(HistogramAggregateActionConfig.class, histogramAggregateActionConfig, "buckets", testBuckets); + AggregateAction aggregateAction = new HistogramAggregateAction(histogramAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofMillis(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testPercentSamplerAggregation() throws Exception { + double testPercent = 50.0; + PercentSamplerAggregateActionConfig percentSamplerAggregateActionConfig = new PercentSamplerAggregateActionConfig(); + setField(PercentSamplerAggregateActionConfig.class, percentSamplerAggregateActionConfig, "percent", testPercent); + AggregateAction aggregateAction = new PercentSamplerAggregateAction(percentSamplerAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), greaterThanOrEqualTo(1)); + assertThat(aggregatedResults.size(), lessThan(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testPutAllAggregation() throws Exception { + AggregateAction aggregateAction = new PutAllAggregateAction(); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testRateLimiterDropAggregation() throws Exception { + RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig = mock(RateLimiterAggregateActionConfig.class); + final int eventsPerSecond = 1; + when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.DROP); + AggregateAction aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testRemoveDuplicatesAggregation() { + AggregateAction aggregateAction = new RemoveDuplicatesAggregateAction(); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testRateLimiterNoDropAggregation() throws Exception { + RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig = mock(RateLimiterAggregateActionConfig.class); + final int eventsPerSecond = 50; + when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.BLOCK); + AggregateAction aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testRateLimiterNoDropAggregationWithMultipleAcknowledgementSets() throws Exception { + AcknowledgementSet acknowledgementSet2 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + AcknowledgementSet acknowledgementSet3 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + final List> records2 = getRecords(testKey, 1, acknowledgementSet2); + acknowledgementSet2.complete(); + final List> records3 = getRecords(testKey, 1, acknowledgementSet3); + acknowledgementSet3.complete(); + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(List.of(testKey)); + + RateLimiterAggregateActionConfig rateLimiterAggregateActionConfig = mock(RateLimiterAggregateActionConfig.class); + final int eventsPerSecond = 50; + when(rateLimiterAggregateActionConfig.getEventsPerSecond()).thenReturn(eventsPerSecond); + when(rateLimiterAggregateActionConfig.getWhenExceeds()).thenReturn(RateLimiterMode.BLOCK); + AggregateAction aggregateAction = new RateLimiterAggregateAction(rateLimiterAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + buffer = mock(Buffer.class); + when(buffer.isEmpty()).thenReturn(true); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenAnswer(a -> { + if (callCount == 0) { + callCount++; + return Map.entry(records, checkpointState); + } else if (callCount == 1) { + callCount++; + return Map.entry(records2, checkpointState); + } else if (callCount == 2) { + callCount++; + return Map.entry(records3, checkpointState); + } else { + return Map.entry(List.of(), checkpointState); + } + }); + + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + @Test + public void testCountAggregationWithMultipleAcknowledgementSets() throws Exception { + AcknowledgementSet acknowledgementSet2 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + AcknowledgementSet acknowledgementSet3 = new DefaultAcknowledgementSet(scheduledExecutorService, (result) -> {}, Duration.ofSeconds(10), new DefaultAcknowledgementSetMetrics(pluginMetrics)); + final List> records2 = getRecords(testKey, 1, acknowledgementSet2); + acknowledgementSet2.complete(); + final List> records3 = getRecords(testKey, 1, acknowledgementSet3); + acknowledgementSet3.complete(); + when(aggregateProcessorConfig.getIdentificationKeys()).thenReturn(List.of(testKey)); + + CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); + AggregateAction aggregateAction = new CountAggregateAction(countAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + callCount = 0; + buffer = mock(Buffer.class); + when(buffer.isEmpty()).thenReturn(true); + when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenAnswer(a -> { + if (callCount == 0) { + callCount++; + return Map.entry(records, checkpointState); + } else if (callCount == 1) { + callCount++; + return Map.entry(records2, checkpointState); + } else if (callCount == 2) { + callCount++; + return Map.entry(records3, checkpointState); + } else { + return Map.entry(List.of(), checkpointState); + } + }); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet2).isDone()); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet3).isDone()); + }); + } + + @Test + public void testCountAggregation() throws Exception { + CountAggregateActionConfig countAggregateActionConfig = new CountAggregateActionConfig(); + setField(CountAggregateActionConfig.class, countAggregateActionConfig, "outputFormat", OutputFormat.RAW); + AggregateAction aggregateAction = new CountAggregateAction(countAggregateActionConfig); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofMillis(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))) + .thenReturn(aggregateAction); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testTailSamplerAggregationWithNoErrors() throws Exception { + TailSamplerAggregateActionConfig tailSamplerAggregateActionConfig = mock(TailSamplerAggregateActionConfig.class); + final Duration testWaitPeriod = Duration.ofMillis(1); + final String testCondition = "/status == 2"; + when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(100); + when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod); + when(tailSamplerAggregateActionConfig.getCondition()).thenReturn(testCondition); + when(expressionEvaluator.evaluateConditional(eq(testCondition), any(Event.class))).thenReturn(false); + AggregateAction aggregateAction = new TailSamplerAggregateAction(tailSamplerAggregateActionConfig, expressionEvaluator); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + + @Test + public void testTailSamplerAggregation() throws Exception { + TailSamplerAggregateActionConfig tailSamplerAggregateActionConfig = mock(TailSamplerAggregateActionConfig.class); + final Duration testWaitPeriod = Duration.ofMillis(1); + final String testCondition = "/status == 2"; + when(tailSamplerAggregateActionConfig.getPercent()).thenReturn(50); + when(tailSamplerAggregateActionConfig.getWaitPeriod()).thenReturn(testWaitPeriod); + when(tailSamplerAggregateActionConfig.getCondition()).thenReturn(testCondition); + when(expressionEvaluator.evaluateConditional(eq(testCondition), any(Event.class))).thenReturn(true); + AggregateAction aggregateAction = new TailSamplerAggregateAction(tailSamplerAggregateActionConfig, expressionEvaluator); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertFalse(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(5)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + @Test + public void testAppendAggregation() throws Exception { + AppendAggregateActionConfig appendAggregateActionConfig = mock(AppendAggregateActionConfig.class); + when(appendAggregateActionConfig.getKeysToAppend()).thenReturn(List.of(testKey)); + AggregateAction aggregateAction = new AppendAggregateAction(appendAggregateActionConfig); + when(pluginFactory.loadPlugin(eq(AggregateAction.class), any(PluginSetting.class))).thenReturn(aggregateAction); + when(aggregateProcessorConfig.getGroupDuration()).thenReturn(Duration.ofSeconds(GROUP_DURATION_FOR_ONLY_SINGLE_CONCLUDE)); + final Processor processor = new AggregateProcessor(aggregateProcessorConfig, pluginMetrics, pluginFactory, expressionEvaluator); + processors = List.of(processor); + + final FutureHelperResult futureHelperResult = mock(FutureHelperResult.class); + when(futureHelperResult.getFailedReasons()).thenReturn(Collections.emptyList()); + try (final MockedStatic futureHelperMockedStatic = mockStatic(FutureHelper.class)) { + futureHelperMockedStatic.when(() -> FutureHelper.awaitFuturesIndefinitely(sinkFutures)) + .thenReturn(futureHelperResult); + + final ProcessWorker processWorker = new ProcessWorker(buffer, processors, pipeline); + + processWorker.run(); + } + await().atMost(TEST_TIMEOUT) + .untilAsserted(() -> { + assertTrue(aggregatedResultReceived); + assertThat(aggregatedResults.size(), equalTo(1)); + assertTrue(((DefaultAcknowledgementSet)acknowledgementSet).isDone()); + }); + } + + + private List> getRecords(String key, int value, AcknowledgementSet ackSet) { + final List> events = new ArrayList<>(); + final Map eventMap = Map.of(key, value); + + for (int i = 0; i < NUM_EVENTS_PER_BATCH; i++) { + final Event event = JacksonEvent.builder() + .withEventType("event") + .withData(eventMap) + .build(); + events.add(new Record<>(event)); + ackSet.add(event); + } + return events; + } + +} +