From c2c96999a26f51c4696086bbf53b22c07d90789f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 26 Nov 2024 17:39:04 +0100 Subject: [PATCH 1/3] Allow to connect to a replica for producers This can speed up producer creation/recovery when a load balancer is used. --- src/docs/asciidoc/api.adoc | 5 + .../rabbitmq/stream/EnvironmentBuilder.java | 35 ++++- .../java/com/rabbitmq/stream/impl/Client.java | 5 +- .../stream/impl/ProducersCoordinator.java | 78 +++++++--- .../stream/impl/StreamEnvironment.java | 10 +- .../stream/impl/StreamEnvironmentBuilder.java | 26 +++- .../java/com/rabbitmq/stream/impl/Utils.java | 4 - .../stream/impl/LoadBalancerClusterTest.java | 146 ++++++++++++++++-- .../stream/impl/ProducersCoordinatorTest.java | 43 +++++- .../impl/StreamEnvironmentUnitTest.java | 17 +- 10 files changed, 312 insertions(+), 57 deletions(-) diff --git a/src/docs/asciidoc/api.adoc b/src/docs/asciidoc/api.adoc index f7edd8fffd..30af7553cd 100644 --- a/src/docs/asciidoc/api.adoc +++ b/src/docs/asciidoc/api.adoc @@ -220,6 +220,11 @@ The client retries 5 times before falling back to the stream leader node. Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available. |`false` +|`forceLeaderForProducers` +|Force connecting to a stream leader for producers. +Set to `false` if it acceptable to stay connected to a stream replica when a load balancer is in use. +|`true` + |`id` |Informational ID for the environment instance. Used as a prefix for connection names. diff --git a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java index 57fe269ea5..64010d4d51 100644 --- a/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -354,7 +354,7 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( *

Do not set this flag to true when streams have only 1 member (the leader), * e.g. for local development. * - *

Default is false. + *

Default is false. * * @param forceReplica whether to force the connection to a replica or not * @return this builder instance @@ -364,6 +364,37 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy( */ EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica); + /** + * Flag to force the connection to the stream leader for producers. + * + *

The library prefers to connect to a node that hosts a stream leader for producers (default + * behavior). + * + *

When using a load balancer, the library does not know in advance the node it connects to. It + * may have to retry to connect to the appropriate node. + * + *

It will retry until it connects to the appropriate node (flag set to true, the + * default). This provides the best data locality, but may require several attempts, delaying the + * creation or the recovery of producers. This usually suits high-throughput use cases. + * + *

The library will accept the connection to a stream replica if the flag is set to false + * . This will speed up the creation/recovery of producers, but at the cost of network hops + * between cluster nodes when publishing messages because only a stream leader accepts writes. + * This is usually acceptable for low-throughput use cases. + * + *

Changing the default value should only benefit systems where a load balancer sits between + * the client applications and the cluster nodes. + * + *

Default is true. + * + * @param forceLeader whether to force the connection to the leader or not + * @return this builder instance + * @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy) + * @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy) + * @since 0.21.0 + */ + EnvironmentBuilder forceLeaderForProducers(boolean forceLeader); + /** * Create the {@link Environment} instance. * diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 5a169b2044..0931e2d20b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2245,7 +2245,10 @@ public StreamMetadata(String stream, short responseCode, Broker leader, List connectionNamingStrategy, - ClientFactory clientFactory) { + ClientFactory clientFactory, + boolean forceLeader) { this.environment = environment; this.clientFactory = clientFactory; this.maxProducersByClient = maxProducersByClient; this.maxTrackingConsumersByClient = maxTrackingConsumersByClient; this.connectionNamingStrategy = connectionNamingStrategy; + this.forceLeader = forceLeader; } Runnable registerProducer(StreamProducer producer, String reference, String stream) { @@ -105,9 +109,10 @@ Runnable registerTrackingConsumer(StreamConsumer consumer) { } private Runnable registerAgentTracker(AgentTracker tracker, String stream) { - Client.Broker broker = getBrokerForProducer(stream); + List candidates = findCandidateNodes(stream, this.forceLeader); + Broker broker = pickBroker(candidates); - addToManager(broker, tracker); + addToManager(broker, candidates, tracker); if (DEBUG) { return () -> { @@ -125,7 +130,7 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) { } } - private void addToManager(Broker node, AgentTracker tracker) { + private void addToManager(Broker node, List candidates, AgentTracker tracker) { ClientParameters clientParameters = environment .clientParametersCopy() @@ -153,7 +158,8 @@ private void addToManager(Broker node, AgentTracker tracker) { if (pickedManager == null) { String name = keyForNode(node); LOGGER.debug("Trying to create producer manager on {}", name); - pickedManager = new ClientProducersManager(node, this.clientFactory, clientParameters); + pickedManager = + new ClientProducersManager(node, candidates, this.clientFactory, clientParameters); LOGGER.debug("Created producer manager on {}, id {}", name, pickedManager.id); } try { @@ -192,11 +198,12 @@ private void addToManager(Broker node, AgentTracker tracker) { } } - private Client.Broker getBrokerForProducer(String stream) { + // package protected for testing + List findCandidateNodes(String stream, boolean forceLeader) { Map metadata = this.environment.locatorOperation( namedFunction(c -> c.metadata(stream), "Candidate lookup to publish to '%s'", stream)); - if (metadata.size() == 0 || metadata.get(stream) == null) { + if (metadata.isEmpty() || metadata.get(stream) == null) { throw new StreamDoesNotExistException(stream); } @@ -210,17 +217,34 @@ private Client.Broker getBrokerForProducer(String stream) { } } + List candidates = new ArrayList<>(); Client.Broker leader = streamMetadata.getLeader(); - if (leader == null) { + if (leader == null && forceLeader) { throw new IllegalStateException("Not leader available for stream " + stream); } - LOGGER.debug( - "Using client on {}:{} to publish to {}", leader.getHost(), leader.getPort(), stream); + candidates.add(new BrokerWrapper(leader, true)); - return leader; + if (!forceLeader && !streamMetadata.getReplicas().isEmpty()) { + candidates.addAll( + streamMetadata.getReplicas().stream() + .map(b -> new BrokerWrapper(b, false)) + .collect(toList())); + } + + LOGGER.debug("Candidates to publish to {}: {}", stream, candidates); + + return Collections.unmodifiableList(candidates); + } + + static Broker pickBroker(List candidates) { + return candidates.stream() + .filter(BrokerWrapper::isLeader) + .findFirst() + .map(BrokerWrapper::broker) + .orElseThrow(() -> new IllegalStateException("Not leader available")); } - void close() { + public void close() { Iterator iterator = this.managers.iterator(); while (iterator.hasNext()) { ClientProducersManager manager = iterator.next(); @@ -568,7 +592,10 @@ private class ClientProducersManager implements Comparable candidates, + ClientFactory cf, + Client.ClientParameters clientParameters) { this.id = managerIdSequence.getAndIncrement(); AtomicReference nameReference = new AtomicReference<>(); AtomicReference ref = new AtomicReference<>(); @@ -682,7 +709,7 @@ private ClientProducersManager( .metadataListener(metadataListener) .clientProperty("connection_name", connectionName), keyForNode(targetNode), - Collections.emptyList()); + candidates.stream().map(BrokerWrapper::broker).collect(toList())); this.client = cf.client(connectionFactoryContext); this.node = Utils.brokerFromClient(this.client); this.name = keyForNode(this.node); @@ -694,18 +721,19 @@ private ClientProducersManager( private void assignProducersToNewManagers( Collection trackers, String stream, BackOffDelayPolicy delayPolicy) { - AsyncRetry.asyncRetry(() -> getBrokerForProducer(stream)) + AsyncRetry.asyncRetry(() -> findCandidateNodes(stream, forceLeader)) .description("Candidate lookup to publish to " + stream) .scheduler(environment.scheduledExecutorService()) .retry(ex -> !(ex instanceof StreamDoesNotExistException)) .delayPolicy(delayPolicy) .build() .thenAccept( - broker -> { + candidates -> { + Broker broker = pickBroker(candidates); String key = keyForNode(broker); LOGGER.debug( "Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key); - trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker)); + trackers.forEach(tracker -> maybeRecoverAgent(broker, candidates, tracker)); }) .exceptionally( ex -> { @@ -730,10 +758,11 @@ private void assignProducersToNewManagers( }); } - private void maybeRecoverAgent(Broker broker, AgentTracker tracker) { + private void maybeRecoverAgent( + Broker broker, List candidates, AgentTracker tracker) { if (tracker.markRecoveryInProgress()) { try { - recoverAgent(broker, tracker); + recoverAgent(broker, candidates, tracker); } catch (Exception e) { LOGGER.warn( "Error while recovering {} tracker {} (stream '{}'). Reason: {}", @@ -750,14 +779,14 @@ private void maybeRecoverAgent(Broker broker, AgentTracker tracker) { } } - private void recoverAgent(Broker node, AgentTracker tracker) { + private void recoverAgent(Broker node, List candidates, AgentTracker tracker) { boolean reassignmentCompleted = false; while (!reassignmentCompleted) { try { if (tracker.isOpen()) { LOGGER.debug( "Using {} to resume {} to {}", node.label(), tracker.type(), tracker.stream()); - addToManager(node, tracker); + addToManager(node, candidates, tracker); tracker.running(); } else { LOGGER.debug( @@ -776,14 +805,15 @@ private void recoverAgent(Broker node, AgentTracker tracker) { tracker.identifiable() ? tracker.id() : "N/A", tracker.stream()); // maybe not a good candidate, let's refresh and retry for this one - node = + candidates = Utils.callAndMaybeRetry( - () -> getBrokerForProducer(tracker.stream()), + () -> findCandidateNodes(tracker.stream(), forceLeader), ex -> !(ex instanceof StreamDoesNotExistException), environment.recoveryBackOffDelayPolicy(), "Candidate lookup for %s on stream '%s'", tracker.type(), tracker.stream()); + node = pickBroker(candidates); } catch (Exception e) { LOGGER.warn( "Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java index 3b66e0d3ca..3f7642cbec 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java @@ -102,7 +102,10 @@ class StreamEnvironment implements Environment { Function connectionNamingStrategy, Function clientFactory, ObservationCollector observationCollector, - boolean forceReplicaForConsumers) { + boolean forceReplicaForConsumers, + boolean forceLeaderForProducers, + Duration producerNodeRetryDelay, + Duration consumerNodeRetryDelay) { this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy; this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy; this.byteBufAllocator = byteBufAllocator; @@ -212,13 +215,14 @@ class StreamEnvironment implements Environment { maxProducersByConnection, maxTrackingConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this)); + Utils.coordinatorClientFactory(this, producerNodeRetryDelay), + forceLeaderForProducers); this.consumersCoordinator = new ConsumersCoordinator( this, maxConsumersByConnection, connectionNamingStrategy, - Utils.coordinatorClientFactory(this), + Utils.coordinatorClientFactory(this, consumerNodeRetryDelay), forceReplicaForConsumers, Utils.brokerPicker()); this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this); diff --git a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java index 3fc919aaba..0ec0a9dca2 100644 --- a/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java +++ b/src/main/java/com/rabbitmq/stream/impl/StreamEnvironmentBuilder.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -65,8 +65,11 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder { private CompressionCodecFactory compressionCodecFactory; private boolean lazyInit = false; private boolean forceReplicaForConsumers = false; + private boolean forceLeaderForProducers = true; private Function clientFactory = Client::new; private ObservationCollector observationCollector = ObservationCollector.NO_OP; + private Duration producerNodeRetryDelay = Duration.ofMillis(500); + private Duration consumerNodeRetryDelay = Duration.ofMillis(1000); public StreamEnvironmentBuilder() {} @@ -274,6 +277,12 @@ public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) { return this; } + @Override + public EnvironmentBuilder forceLeaderForProducers(boolean forceLeader) { + this.forceLeaderForProducers = forceLeader; + return this; + } + @Override public TlsConfiguration tls() { this.tls.enable(); @@ -296,6 +305,16 @@ public EnvironmentBuilder observationCollector(ObservationCollector observati return this; } + StreamEnvironmentBuilder producerNodeRetryDelay(Duration producerNodeRetryDelay) { + this.producerNodeRetryDelay = producerNodeRetryDelay; + return this; + } + + StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay) { + this.consumerNodeRetryDelay = consumerNodeRetryDelay; + return this; + } + @Override public Environment build() { if (this.compressionCodecFactory == null) { @@ -327,7 +346,10 @@ public Environment build() { connectionNamingStrategy, this.clientFactory, this.observationCollector, - this.forceReplicaForConsumers); + this.forceReplicaForConsumers, + this.forceLeaderForProducers, + this.producerNodeRetryDelay, + this.consumerNodeRetryDelay); } static final class DefaultTlsConfiguration implements TlsConfiguration { diff --git a/src/main/java/com/rabbitmq/stream/impl/Utils.java b/src/main/java/com/rabbitmq/stream/impl/Utils.java index 558d2a347b..f63ac5792b 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Utils.java +++ b/src/main/java/com/rabbitmq/stream/impl/Utils.java @@ -135,10 +135,6 @@ static short encodeResponseCode(Short code) { return (short) (code | 0B1000_0000_0000_0000); } - static ClientFactory coordinatorClientFactory(StreamEnvironment environment) { - return coordinatorClientFactory(environment, ConditionalClientFactory.RETRY_INTERVAL); - } - static ClientFactory coordinatorClientFactory( StreamEnvironment environment, Duration retryInterval) { String messageFormat = diff --git a/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java b/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java index 727ccc58e4..9d63f42fdc 100644 --- a/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/LoadBalancerClusterTest.java @@ -14,26 +14,28 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.Assertions.assertThat; import static java.lang.Integer.parseInt; import static java.util.stream.Collectors.toSet; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; -import com.rabbitmq.stream.Address; -import com.rabbitmq.stream.ConsumerFlowStrategy; -import com.rabbitmq.stream.OffsetSpecification; -import com.rabbitmq.stream.SubscriptionListener; +import com.rabbitmq.stream.*; +import com.rabbitmq.stream.impl.Client.Broker; import com.rabbitmq.stream.impl.MonitoringTestUtils.ConsumerCoordinatorInfo; +import com.rabbitmq.stream.impl.MonitoringTestUtils.EnvironmentInfo; +import com.rabbitmq.stream.impl.MonitoringTestUtils.ProducersCoordinatorInfo; import com.rabbitmq.stream.impl.TestUtils.DisabledIfNotCluster; import io.netty.channel.EventLoopGroup; import java.time.Duration; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; -import java.util.stream.IntStream; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; @@ -49,11 +51,13 @@ public class LoadBalancerClusterTest { @Mock StreamEnvironment environment; @Mock StreamConsumer consumer; + @Mock StreamProducer producer; AutoCloseable mocks; TestUtils.ClientFactory cf; String stream; EventLoopGroup eventLoopGroup; Client locator; + Address loadBalancerAddress = new Address("localhost", LB_PORT); @BeforeEach void init() { @@ -62,7 +66,6 @@ void init() { when(environment.locator()).thenReturn(locator); when(environment.clientParametersCopy()) .thenReturn(new Client.ClientParameters().eventLoopGroup(eventLoopGroup).port(LB_PORT)); - Address loadBalancerAddress = new Address("localhost", LB_PORT); when(environment.addressResolver()).thenReturn(address -> loadBalancerAddress); when(environment.locatorOperation(any())).thenCallRealMethod(); } @@ -86,7 +89,7 @@ void pickConsumersAmongCandidates(boolean forceReplica) { forceReplica, Utils.brokerPicker())) { - IntStream.range(0, subscriptionCount) + range(0, subscriptionCount) .forEach( ignored -> { c.subscribe( @@ -102,22 +105,139 @@ void pickConsumersAmongCandidates(boolean forceReplica) { }); Client.StreamMetadata metadata = locator.metadata(stream).get(stream); - Set allowedNodes = new HashSet<>(metadata.getReplicas()); + Set allowedNodes = new HashSet<>(metadata.getReplicas()); if (!forceReplica) { allowedNodes.add(metadata.getLeader()); } ConsumerCoordinatorInfo info = MonitoringTestUtils.extract(c); assertThat(info.consumerCount()).isEqualTo(subscriptionCount); - Set usedNodes = + Set usedNodes = info.clients().stream() .map(m -> m.node().split(":")) - .map(np -> new Client.Broker(np[0], parseInt(np[1]))) + .map(np -> new Broker(np[0], parseInt(np[1]))) .collect(toSet()); assertThat(usedNodes).hasSameSizeAs(allowedNodes).containsAll(allowedNodes); } } + @Test + void pickProducersAmongCandidatesIfInstructed() { + boolean forceLeader = true; + when(consumer.stream()).thenReturn(stream); + int maxAgentPerClient = 2; + int agentCount = maxAgentPerClient * 10; + try (ProducersCoordinator c = + new ProducersCoordinator( + environment, + maxAgentPerClient, + maxAgentPerClient, + type -> "producer-connection", + Utils.coordinatorClientFactory(this.environment, Duration.ofMillis(10)), + forceLeader)) { + + range(0, agentCount) + .forEach( + ignored -> { + c.registerProducer(producer, null, stream); + c.registerTrackingConsumer(consumer); + }); + + Client.StreamMetadata metadata = locator.metadata(stream).get(stream); + Set allowedNodes = new HashSet<>(Collections.singleton(metadata.getLeader())); + if (!forceLeader) { + allowedNodes.addAll(metadata.getReplicas()); + } + + ProducersCoordinatorInfo info = MonitoringTestUtils.extract(c); + assertThat(info.producerCount()).isEqualTo(agentCount); + assertThat(info.trackingConsumerCount()).isEqualTo(agentCount); + Set usedNodes = + info.nodesConnected().stream() + .map(n -> n.split(":")) + .map(np -> new Broker(np[0], parseInt(np[1]))) + .collect(toSet()); + assertThat(usedNodes).hasSameSizeAs(allowedNodes).containsAll(allowedNodes); + } + } + + @ParameterizedTest + @ValueSource(booleans = {true, false}) + void producersConsumersShouldSpreadAccordingToDataLocalitySettings(boolean forceLocality) { + int maxPerConnection = 2; + int agentCount = maxPerConnection * 20; + StreamEnvironmentBuilder builder = (StreamEnvironmentBuilder) Environment.builder(); + builder + .producerNodeRetryDelay(Duration.ofMillis(10)) + .consumerNodeRetryDelay(Duration.ofMillis(10)); + try (Environment env = + builder + .port(LB_PORT) + .forceReplicaForConsumers(forceLocality) + .forceReplicaForConsumers(forceLocality) + .addressResolver(addr -> loadBalancerAddress) + .maxProducersByConnection(maxPerConnection) + .maxConsumersByConnection(maxPerConnection) + .forceLeaderForProducers(forceLocality) + .netty() + .eventLoopGroup(eventLoopGroup) + .environmentBuilder() + .build()) { + TestUtils.Sync consumeSync = TestUtils.sync(agentCount * agentCount); + Set consumersThatReceived = ConcurrentHashMap.newKeySet(agentCount); + List producers = new ArrayList<>(); + range(0, agentCount) + .forEach( + index -> { + producers.add(env.producerBuilder().stream(stream).build()); + env.consumerBuilder().stream(stream) + .messageHandler( + (ctx, msg) -> { + consumersThatReceived.add(index); + consumeSync.down(); + }) + .offset(OffsetSpecification.first()) + .build(); + }); + producers.forEach(p -> p.send(p.messageBuilder().build(), ctx -> {})); + assertThat(consumeSync).completes(); + assertThat(consumersThatReceived).containsAll(range(0, agentCount).boxed().collect(toSet())); + + EnvironmentInfo info = MonitoringTestUtils.extract(env); + ProducersCoordinatorInfo producerInfo = info.getProducers(); + ConsumerCoordinatorInfo consumerInfo = info.getConsumers(); + + assertThat(producerInfo.producerCount()).isEqualTo(agentCount); + assertThat(consumerInfo.consumerCount()).isEqualTo(agentCount); + + Client.StreamMetadata metadata = locator.metadata(stream).get(stream); + + Function, Set> toBrokers = + nodes -> + nodes.stream() + .map(n -> n.split(":")) + .map(n -> new Broker(n[0], parseInt(n[1]))) + .collect(toSet()); + Set usedNodes = toBrokers.apply(producerInfo.nodesConnected()); + assertThat(usedNodes).contains(metadata.getLeader()); + if (forceLocality) { + assertThat(usedNodes).hasSize(1); + } else { + assertThat(usedNodes).hasSize(metadata.getReplicas().size() + 1); + assertThat(usedNodes).containsAll(metadata.getReplicas()); + } + + usedNodes = toBrokers.apply(consumerInfo.nodesConnected()); + assertThat(usedNodes).containsAll(metadata.getReplicas()); + if (forceLocality) { + assertThat(usedNodes).hasSameSizeAs(metadata.getReplicas()); + } else { + assertThat(usedNodes).hasSize(metadata.getReplicas().size() + 1); + assertThat(usedNodes).contains(metadata.getLeader()); + } + } + } + private static ConsumerFlowStrategy flowStrategy() { return ConsumerFlowStrategy.creditOnChunkArrival(10); } diff --git a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java index dc89e5ec4c..176f5dbb13 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java @@ -19,6 +19,7 @@ import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; import static com.rabbitmq.stream.impl.TestUtils.answer; import static com.rabbitmq.stream.impl.TestUtils.metadata; +import static java.util.stream.Collectors.toList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.ArgumentMatchers.any; @@ -79,6 +80,10 @@ static Client.Broker leader() { return new Client.Broker("leader", 5552); } + static Utils.BrokerWrapper leaderWrapper() { + return new Utils.BrokerWrapper(leader(), true); + } + static Client.Broker leader1() { return new Client.Broker("leader-1", 5552); } @@ -91,6 +96,10 @@ static List replicas() { return Arrays.asList(new Client.Broker("replica1", 5552), new Client.Broker("replica2", 5552)); } + static List replicaWrappers() { + return replicas().stream().map(b -> new Utils.BrokerWrapper(b, false)).collect(toList()); + } + @BeforeEach void init() { Client.ClientParameters clientParameters = @@ -125,7 +134,8 @@ public Client.ClientParameters metadataListener( ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT, ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, type -> "producer-connection", - clientFactory); + clientFactory, + true); when(client.isOpen()).thenReturn(true); when(client.deletePublisher(anyByte())).thenReturn(new Response(Constants.RESPONSE_CODE_OK)); } @@ -194,7 +204,8 @@ void registerShouldAllowPublishing() { ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT, ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, type -> "producer-connection", - cf); + cf, + true); when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); when(clientFactory.client(any())).thenReturn(client); @@ -221,7 +232,8 @@ void shouldGetExactNodeImmediatelyWithAdvertisedHostNameClientFactoryAndExactNod ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT, ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, type -> "producer-connection", - cf); + cf, + true); when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); when(clientFactory.client(any())).thenReturn(client); @@ -563,7 +575,8 @@ void growShrinkResourcesBasedOnProducersAndTrackingConsumersCount(int maxProduce maxProducersByClient, ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, type -> "producer-connection", - clientFactory); + clientFactory, + true); class ProducerInfo { StreamProducer producer; @@ -721,6 +734,28 @@ void pickSlotTest() { assertThat(pickSlot(map, "257", sequence)).isEqualTo(5); } + @Test + void findCandidateNodesShouldReturnOnlyLeaderWhenForceLeaderIsTrue() { + when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); + assertThat(coordinator.findCandidateNodes("stream", true)).containsOnly(leaderWrapper()); + } + + @Test + void findCandidateNodesShouldReturnLeaderAndReplicasWhenForceLeaderIsFalse() { + when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); + assertThat(coordinator.findCandidateNodes("stream", false)) + .hasSize(3) + .contains(leaderWrapper()) + .containsAll(replicaWrappers()); + } + + @Test + void findCandidateNodesShouldThrowIfThereIsNoLeader() { + when(locator.metadata("stream")).thenReturn(metadata(null, replicas())); + assertThatThrownBy(() -> coordinator.findCandidateNodes("stream", true)) + .isInstanceOf(IllegalStateException.class); + } + private static ScheduledExecutorService createScheduledExecutorService() { return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor()); } diff --git a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java index c422d454a3..5724e0f6f9 100644 --- a/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentUnitTest.java @@ -1,4 +1,4 @@ -// Copyright (c) 2020-2023 Broadcom. All Rights Reserved. +// Copyright (c) 2020-2024 Broadcom. All Rights Reserved. // The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. // // This software, the RabbitMQ Stream Java client library, is dual-licensed under the @@ -98,7 +98,10 @@ Client.ClientParameters duplicate() { type -> "locator-connection", cf, ObservationCollector.NO_OP, - false); + false, + true, + Duration.ofMillis(100), + Duration.ofMillis(100)); } @AfterEach @@ -163,7 +166,10 @@ void shouldTryUrisOnInitializationFailure() throws Exception { type -> "locator-connection", cf, ObservationCollector.NO_OP, - false); + false, + true, + Duration.ofMillis(100), + Duration.ofMillis(100)); verify(cf, times(3)).apply(any(Client.ClientParameters.class)); } @@ -191,7 +197,10 @@ void shouldNotOpenConnectionWhenLazyInitIsEnabled( type -> "locator-connection", cf, ObservationCollector.NO_OP, - false); + false, + true, + Duration.ofMillis(100), + Duration.ofMillis(100)); verify(cf, times(expectedConnectionCreation)).apply(any(Client.ClientParameters.class)); } From 062a0948964d5209656add5eba83d00be94d8ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 26 Nov 2024 17:55:49 +0100 Subject: [PATCH 2/3] Squash spotbugs warning --- src/main/java/com/rabbitmq/stream/impl/Client.java | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index 0931e2d20b..b09190b3ce 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2263,6 +2263,7 @@ public Broker getLeader() { return leader; } + @SuppressFBWarnings("EI_EXPOSE_REP2") public List getReplicas() { return replicas; } From 5afeb53864caa20c2d15b6bbc7f7390c6a59d04d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arnaud=20Cogolu=C3=A8gnes?= Date: Tue, 26 Nov 2024 19:00:16 +0100 Subject: [PATCH 3/3] Return copy of replicas --- src/main/java/com/rabbitmq/stream/impl/Client.java | 7 +++++-- .../com/rabbitmq/stream/impl/ProducersCoordinator.java | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/rabbitmq/stream/impl/Client.java b/src/main/java/com/rabbitmq/stream/impl/Client.java index b09190b3ce..011b784d4d 100644 --- a/src/main/java/com/rabbitmq/stream/impl/Client.java +++ b/src/main/java/com/rabbitmq/stream/impl/Client.java @@ -2263,9 +2263,12 @@ public Broker getLeader() { return leader; } - @SuppressFBWarnings("EI_EXPOSE_REP2") public List getReplicas() { - return replicas; + return this.replicas.isEmpty() ? Collections.emptyList() : new ArrayList<>(this.replicas); + } + + boolean hasReplicas() { + return !this.replicas.isEmpty(); } public String getStream() { diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index 8656f232c1..0cd9038209 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -224,7 +224,7 @@ List findCandidateNodes(String stream, boolean forceLeader) { } candidates.add(new BrokerWrapper(leader, true)); - if (!forceLeader && !streamMetadata.getReplicas().isEmpty()) { + if (!forceLeader && streamMetadata.hasReplicas()) { candidates.addAll( streamMetadata.getReplicas().stream() .map(b -> new BrokerWrapper(b, false))