Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rebase neptune-work-in-progress from main #5307

Merged
merged 5 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Builder<T> withTimeReceived(final Instant timeReceived) {
* @return returns the builder
* @since 2.10
*/
protected Builder<T> withEventHandle(final EventHandle eventHandle) {
public Builder<T> withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
void prepareForShutdown();

/**
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
*/
default boolean holdsEvents() {
return false;
}

/**
* @since 1.2
* Returns true if the Processor's internal state is safe to be shutdown.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.processor;

import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProcessorTest {

@Test
public void testDefault() {
Processor processor = mock(Processor.class);
when(processor.holdsEvents()).thenCallRealMethod();
assertThat(processor.holdsEvents(), equalTo(false));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
* @param records records that needs to published to each sink
* @return List of Future, each future for each sink
*/
List<Future<Void>> publishToSinks(final Collection<Record> records) {
public List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.atLeast;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.model.CheckpointState;
Expand Down Expand Up @@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() {
}
}

@Test
void testProcessWorkerWithProcessorsNotHoldingEvents() {
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
when(eventHandle.release(true)).thenReturn(true);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(false);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, atLeast(1)).release(true);
}


@Test
void testProcessWorkerWithProcessorsHoldingEvents() {
EventHandle eventHandle = mock(EventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(true);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);

processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, never()).release(true);
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ void testGivenMatchingRuleThenReturnTrue() {
.getRuleIndex();

assertThat(genericNotOperator.shouldEvaluate(context), is(true));
assertThat(genericNotOperator.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void testShouldEvaluate() {
assertThat(objectUnderTest.shouldEvaluate(ctx), is(true));
when(ctx.getRuleIndex()).thenReturn(-1);
assertThat(objectUnderTest.shouldEvaluate(ctx), is(false));
assertThat(objectUnderTest.isBooleanOperator(), is(true));
}

@Test
Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/aggregate-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':data-prepper-expression')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation project(':data-prepper-plugins:otel-metrics-raw-processor')
testImplementation project(':data-prepper-core')
implementation libs.guava.core
implementation libs.commons.lang3
implementation libs.opentelemetry.proto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Collections;

Expand All @@ -29,6 +30,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
return AggregateActionResponse.fromEvent(event);
}

/**
* indicates if the action holds the events or not
*
*/
default boolean holdsEvents() {
return false;
}

/**
* Concludes a group of Events
*
Expand All @@ -38,6 +47,12 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
* @since 1.3
*/
default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Map;
import java.util.function.Function;
import java.time.Duration;
Expand All @@ -28,6 +30,12 @@ public interface AggregateActionInput {
*/
Map<Object, Object> getIdentificationKeys();

/**
* @return returns eventHandle held by the instance
* @since 2.11
*/
EventHandle getEventHandle();

/**
* Sets custom shouldConclude function
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.AggregateEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
Expand All @@ -19,13 +24,27 @@ class AggregateGroup implements AggregateActionInput {
private final Lock handleEventForGroupLock;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;
private EventHandle eventHandle;

AggregateGroup(final Map<Object, Object> identificationKeys) {
this.groupState = new DefaultGroupState();
this.identificationKeys = identificationKeys;
this.groupStart = Instant.now();
this.concludeGroupLock = new ReentrantLock();
this.handleEventForGroupLock = new ReentrantLock();
this.eventHandle = new AggregateEventHandle(Instant.now());
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
}

public void attachToEventAcknowledgementSet(Event event) {
InternalEventHandle internalEventHandle;
EventHandle handle = event.getEventHandle();
internalEventHandle = (InternalEventHandle)(handle);
internalEventHandle.addEventHandle(eventHandle);
}

public GroupState getGroupState() {
Expand Down Expand Up @@ -63,5 +82,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) {
void resetGroup() {
groupStart = Instant.now();
groupState.clear();
this.eventHandle = new AggregateEventHandle(groupStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe
return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap()));
}


List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting);
}

AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) {
AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
aggregateGroup.attachToEventAcknowledgementSet(event);
return aggregateGroup;
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Record<Event>> recordsOut = new LinkedList<>();
Expand Down Expand Up @@ -124,7 +130,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
continue;
}
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);

Expand All @@ -149,6 +155,11 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return recordsOut;
}

@Override
public boolean holdsEvents() {
return aggregateAction.holdsEvents();
}

public static long getTimeNanos(final Instant time) {
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
Expand Down
Loading
Loading