Skip to content

Commit

Permalink
Merge pull request #1979 from bosch-io/bugfix/fix-stopConsumingOnRequ…
Browse files Browse the repository at this point in the history
…est-test

fix stopConsumingOnRequest test
  • Loading branch information
thjaeckle authored Jul 9, 2024
2 parents e10108c + 0b570cd commit 8f48e96
Showing 1 changed file with 11 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@
import static org.assertj.core.api.Assertions.fail;
import static org.eclipse.ditto.connectivity.service.messaging.TestConstants.header;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.*;

import java.nio.ByteBuffer;
import java.util.HashMap;
Expand Down Expand Up @@ -100,7 +97,13 @@ public void stopConsumingOnRequest() {
.drainAndShutdown(any(), any());
underTest.tell(KafkaConsumerActor.GracefulStop.START, getRef());
expectMsg(Done.getInstance());
verify(control).drainAndShutdown(any(), any());
// Change from once to twice since until now drainAndShutdown was always called twice.
// It wasn't caught because it failed once every 50 times.
// These types of problems occur when you try to handle not graceful shutdowns
// and explicitly call drainAndShutdown from the postStop method.
// Similar problems are discussed here -> https://github.com/akka/alpakka-kafka/issues/526
verify(control, atLeast(1)).drainAndShutdown(any(), any());
verify(control, atMost(2)).drainAndShutdown(any(), any());
}};

}
Expand Down Expand Up @@ -129,7 +132,7 @@ protected void testHeaderMapping() {

@Override
protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMappingSink,
final PayloadMapping payloadMapping) {
final PayloadMapping payloadMapping) {

final Map<String, String> map = new HashMap<>(TestConstants.HEADER_MAPPING.getMapping());
map.putAll(Map.of(
Expand Down Expand Up @@ -169,13 +172,13 @@ protected Props getConsumerActorProps(final Sink<Object, NotUsed> inboundMapping

@Override
protected void consumeMessage(final ActorRef consumerActor, final ConsumerRecord<String, ByteBuffer> inboundMessage,
final ActorRef sender) {
final ActorRef sender) {
sourceQueue.offer(inboundMessage);
}

@Override
protected ConsumerRecord<String, ByteBuffer> getInboundMessage(final String payload,
final Map.Entry<String, Object> header) {
final Map.Entry<String, Object> header) {
final Headers headers = new RecordHeaders()
.add(toRecordHeader(header))
.add(toRecordHeader(REPLY_TO_HEADER));
Expand Down

0 comments on commit 8f48e96

Please sign in to comment.