Skip to content

Commit

Permalink
Add acknowledgement support to aggregate processor (#5139)
Browse files Browse the repository at this point in the history
* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments and added tests

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle errors

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed test errors by adding await

Signed-off-by: Krishna Kondaka <[email protected]>

* Addressed review comments

Signed-off-by: Krishna Kondaka <[email protected]>

* Removed unnecessary API

Signed-off-by: Krishna Kondaka <[email protected]>

* Fixed checkstyle error

Signed-off-by: Krishna Kondaka <[email protected]>

---------

Signed-off-by: Krishna Kondaka <[email protected]>
  • Loading branch information
kkondaka authored and dlvenable committed Jan 3, 2025
1 parent f784809 commit 1e6c0c8
Show file tree
Hide file tree
Showing 22 changed files with 874 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,7 +587,7 @@ public Builder<T> withTimeReceived(final Instant timeReceived) {
* @return returns the builder
* @since 2.10
*/
protected Builder<T> withEventHandle(final EventHandle eventHandle) {
public Builder<T> withEventHandle(final EventHandle eventHandle) {
this.eventHandle = eventHandle;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ public interface Processor<InputRecord extends Record<?>, OutputRecord extends R
*/
void prepareForShutdown();

/**
* @since 2.11
* Indicates if the processor holds the events or not
* Holding events indicates that the events are not ready to be released.
*/
default boolean holdsEvents() {
return false;
}

/**
* @since 1.2
* Returns true if the Processor's internal state is safe to be shutdown.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.processor;

import org.junit.jupiter.api.Test;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class ProcessorTest {

@Test
public void testDefault() {
Processor processor = mock(Processor.class);
when(processor.holdsEvents()).thenCallRealMethod();
assertThat(processor.holdsEvents(), equalTo(false));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void shutdownExecutorService(final ExecutorService executorService, fina
* @param records records that needs to published to each sink
* @return List of Future, each future for each sink
*/
List<Future<Void>> publishToSinks(final Collection<Record> records) {
public List<Future<Void>> publishToSinks(final Collection<Record> records) {
final int sinksSize = sinks.size();
final List<Future<Void>> sinkFutures = new ArrayList<>(sinksSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ private void doRun() {

try {
records = processor.execute(records);
if (inputEvents != null) {
// acknowledge missing events only if the processor is not holding events
if (!processor.holdsEvents() && inputEvents != null) {
processAcknowledgements(inputEvents, records);
}
} catch (final Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import org.mockito.Mock;
import org.mockito.MockedStatic;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.atLeast;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelper;
import org.opensearch.dataprepper.core.pipeline.common.FutureHelperResult;
import org.opensearch.dataprepper.model.CheckpointState;
Expand Down Expand Up @@ -132,6 +134,61 @@ void testProcessWorkerHappyPathWithAcknowledgments() {
}
}

@Test
void testProcessWorkerWithProcessorsNotHoldingEvents() {
DefaultEventHandle eventHandle = mock(DefaultEventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
when(eventHandle.release(true)).thenReturn(true);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(false);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);
processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, atLeast(1)).release(true);
}


@Test
void testProcessWorkerWithProcessorsHoldingEvents() {
EventHandle eventHandle = mock(EventHandle.class);
Event event = mock(Event.class);
Record record = mock(Record.class);
lenient().when(event.getEventHandle()).thenReturn(eventHandle);
when(record.getData()).thenReturn(event);
final List<Record> records = List.of(record);
final CheckpointState checkpointState = mock(CheckpointState.class);
final Map.Entry<Collection, CheckpointState> readResult = Map.entry(records, checkpointState);
when(buffer.read(pipeline.getReadBatchTimeoutInMillis())).thenReturn(readResult);

final Processor processor1 = mock(Processor.class);
when(processor1.holdsEvents()).thenReturn(true);
when(processor1.execute(records)).thenReturn(List.of());
when(processor1.isReadyForShutdown()).thenReturn(true);

processors = List.of(processor1);
when(source.areAcknowledgementsEnabled()).thenReturn(true);

final ProcessWorker processWorker = createObjectUnderTest();

processWorker.run();

verify(eventHandle, never()).release(true);
}

@Test
void testProcessWorkerWithProcessorThrowingExceptionIsCaughtProperly() {

Expand Down
1 change: 1 addition & 0 deletions data-prepper-plugins/aggregate-processor/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dependencies {
implementation project(':data-prepper-expression')
implementation project(':data-prepper-plugins:otel-proto-common')
implementation project(':data-prepper-plugins:otel-metrics-raw-processor')
testImplementation project(':data-prepper-core')
implementation libs.guava.core
implementation libs.commons.lang3
implementation libs.opentelemetry.proto
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Collections;

Expand All @@ -29,6 +30,14 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
return AggregateActionResponse.fromEvent(event);
}

/**
* indicates if the action holds the events or not
*
*/
default boolean holdsEvents() {
return false;
}

/**
* Concludes a group of Events
*
Expand All @@ -38,6 +47,12 @@ default AggregateActionResponse handleEvent(final Event event, final AggregateAc
* @since 1.3
*/
default AggregateActionOutput concludeGroup(final AggregateActionInput aggregateActionInput) {
if (aggregateActionInput != null) {
EventHandle eventHandle = aggregateActionInput.getEventHandle();
if (eventHandle != null) {
eventHandle.release(true);
}
}
return new AggregateActionOutput(Collections.emptyList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.EventHandle;

import java.util.Map;
import java.util.function.Function;
import java.time.Duration;
Expand All @@ -28,6 +30,12 @@ public interface AggregateActionInput {
*/
Map<Object, Object> getIdentificationKeys();

/**
* @return returns eventHandle held by the instance
* @since 2.11
*/
EventHandle getEventHandle();

/**
* Sets custom shouldConclude function
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@

package org.opensearch.dataprepper.plugins.processor.aggregate;

import org.opensearch.dataprepper.model.event.AggregateEventHandle;
import org.opensearch.dataprepper.model.event.InternalEventHandle;
import org.opensearch.dataprepper.model.event.EventHandle;
import org.opensearch.dataprepper.model.event.Event;

import java.time.Duration;
import java.time.Instant;
import java.util.function.Function;
Expand All @@ -19,13 +24,27 @@ class AggregateGroup implements AggregateActionInput {
private final Lock handleEventForGroupLock;
private final Map<Object, Object> identificationKeys;
private Function<Duration, Boolean> customShouldConclude;
private EventHandle eventHandle;

AggregateGroup(final Map<Object, Object> identificationKeys) {
this.groupState = new DefaultGroupState();
this.identificationKeys = identificationKeys;
this.groupStart = Instant.now();
this.concludeGroupLock = new ReentrantLock();
this.handleEventForGroupLock = new ReentrantLock();
this.eventHandle = new AggregateEventHandle(Instant.now());
}

@Override
public EventHandle getEventHandle() {
return eventHandle;
}

public void attachToEventAcknowledgementSet(Event event) {
InternalEventHandle internalEventHandle;
EventHandle handle = event.getEventHandle();
internalEventHandle = (InternalEventHandle)(handle);
internalEventHandle.addEventHandle(eventHandle);
}

public GroupState getGroupState() {
Expand Down Expand Up @@ -63,5 +82,6 @@ boolean shouldConcludeGroup(final Duration groupDuration) {
void resetGroup() {
groupStart = Instant.now();
groupState.clear();
this.eventHandle = new AggregateEventHandle(groupStart);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ AggregateGroup getAggregateGroup(final IdentificationKeysHasher.IdentificationKe
return allGroups.computeIfAbsent(identificationKeysMap, (hash) -> new AggregateGroup(identificationKeysMap.getKeyMap()));
}


List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> getGroupsToConclude(final boolean forceConclude) {
final List<Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup>> groupsToConclude = new ArrayList<>();
for (final Map.Entry<IdentificationKeysHasher.IdentificationKeysMap, AggregateGroup> groupEntry : allGroups.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ private AggregateAction loadAggregateAction(final PluginFactory pluginFactory) {
return pluginFactory.loadPlugin(AggregateAction.class, actionPluginSetting);
}

AggregateGroup getAggregateGroupForEvent(final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap, final Event event) {
AggregateGroup aggregateGroup = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
aggregateGroup.attachToEventAcknowledgementSet(event);
return aggregateGroup;
}

@Override
public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
final List<Record<Event>> recordsOut = new LinkedList<>();
Expand Down Expand Up @@ -124,7 +130,7 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
continue;
}
final IdentificationKeysHasher.IdentificationKeysMap identificationKeysMap = identificationKeysHasher.createIdentificationKeysMapFromEvent(event);
final AggregateGroup aggregateGroupForEvent = aggregateGroupManager.getAggregateGroup(identificationKeysMap);
final AggregateGroup aggregateGroupForEvent = getAggregateGroupForEvent(identificationKeysMap, event);

final AggregateActionResponse handleEventResponse = aggregateActionSynchronizer.handleEventForGroup(event, identificationKeysMap, aggregateGroupForEvent);

Expand All @@ -149,6 +155,11 @@ public Collection<Record<Event>> doExecute(Collection<Record<Event>> records) {
return recordsOut;
}

@Override
public boolean holdsEvents() {
return aggregateAction.holdsEvents();
}

public static long getTimeNanos(final Instant time) {
final long NANO_MULTIPLIER = 1_000 * 1_000 * 1_000;
long currentTimeNanos = time.getEpochSecond() * NANO_MULTIPLIER + time.getNano();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
return new AggregateActionOutput(List.of(event));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
Integer countValue = (Integer)groupState.get(countKey);
Expand All @@ -168,6 +169,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withValue((double)countValue)
.withExemplars(List.of(exemplar))
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)sum;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(groupState)
.withEventHandle(aggregateActionInput.getEventHandle())
.build();
} else {
List<Double> explicitBoundsList = new ArrayList<Double>();
Expand Down Expand Up @@ -262,6 +263,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
.withExplicitBoundsList(explicitBoundsList)
.withExemplars(exemplarList)
.withAttributes(attr)
.withEventHandle(aggregateActionInput.getEventHandle())
.build(false);
event = (Event)histogram;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
return AggregateActionResponse.nullEventResponse();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
final Event event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
.withData(aggregateActionInput.getGroupState())
.withEventHandle(aggregateActionInput.getEventHandle())
.build();

return new AggregateActionOutput(List.of(event));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ public AggregateActionResponse handleEvent(final Event event, final AggregateAct
}
return new AggregateActionResponse(event);
}

}
Loading

0 comments on commit 1e6c0c8

Please sign in to comment.