From 3d6b1dc1d573c219c30b15c645d842a90c6ab05e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Wed, 11 Dec 2024 15:34:35 +0100 Subject: [PATCH] Use 1 initial credit in test --- .../com/rabbitmq/stream/impl/StreamConsumerTest.java | 11 ++++------- 1 file changed, 4 insertions(+), 7 deletions(-) diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java index be2dc4a8c3..b97ba4ba69 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamConsumerTest.java @@ -203,7 +203,7 @@ void consumeWithAsyncConsumerFlowControl() throws Exception { environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .flow() - .strategy(creditWhenHalfMessagesProcessed()) + .strategy(creditWhenHalfMessagesProcessed(1)) .builder(); List messageContexts = synchronizedList(new ArrayList<>()); @@ -244,14 +244,13 @@ void asynchronousProcessingWithFlowControl() { int messageCount = 100_000; publishAndWaitForConfirms(cf, messageCount, stream); - ExecutorService executorService = - Executors.newFixedThreadPool(getRuntime().availableProcessors()); - try { + try (ExecutorService executorService = + Executors.newFixedThreadPool(getRuntime().availableProcessors())) { CountDownLatch latch = new CountDownLatch(messageCount); environment.consumerBuilder().stream(stream) .offset(OffsetSpecification.first()) .flow() - .strategy(creditWhenHalfMessagesProcessed()) + .strategy(creditWhenHalfMessagesProcessed(1)) .builder() .messageHandler( (ctx, message) -> @@ -262,8 +261,6 @@ void asynchronousProcessingWithFlowControl() { })) .build(); assertThat(latch).is(completed()); - } finally { - executorService.shutdownNow(); } }