Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to connect to a replica for producers #662

Merged
merged 3 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/docs/asciidoc/api.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,11 @@ The client retries 5 times before falling back to the stream leader node.
Set to `true` only for clustered environments, not for 1-node environments, where only the stream leader is available.
|`false`

|`forceLeaderForProducers`
|Force connecting to a stream leader for producers.
Set to `false` if it acceptable to stay connected to a stream replica when a load balancer is in use.
|`true`

|`id`
|Informational ID for the environment instance.
Used as a prefix for connection names.
Expand Down
35 changes: 33 additions & 2 deletions src/main/java/com/rabbitmq/stream/EnvironmentBuilder.java
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -354,7 +354,7 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
* <p><b>Do not set this flag to <code>true</code> when streams have only 1 member (the leader),
* e.g. for local development.</b>
*
* <p>Default is false.
* <p>Default is <code>false</code>.
*
* @param forceReplica whether to force the connection to a replica or not
* @return this builder instance
Expand All @@ -364,6 +364,37 @@ EnvironmentBuilder topologyUpdateBackOffDelayPolicy(
*/
EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica);

/**
* Flag to force the connection to the stream leader for producers.
*
* <p>The library prefers to connect to a node that hosts a stream leader for producers (default
* behavior).
*
* <p>When using a load balancer, the library does not know in advance the node it connects to. It
* may have to retry to connect to the appropriate node.
*
* <p>It will retry until it connects to the appropriate node (flag set to <code>true</code>, the
* default). This provides the best data locality, but may require several attempts, delaying the
* creation or the recovery of producers. This usually suits high-throughput use cases.
*
* <p>The library will accept the connection to a stream replica if the flag is set to <code>false
* </code>. This will speed up the creation/recovery of producers, but at the cost of network hops
* between cluster nodes when publishing messages because only a stream leader accepts writes.
* This is usually acceptable for low-throughput use cases.
*
* <p>Changing the default value should only benefit systems where a load balancer sits between
* the client applications and the cluster nodes.
*
* <p>Default is <code>true</code>.
*
* @param forceLeader whether to force the connection to the leader or not
* @return this builder instance
* @see #recoveryBackOffDelayPolicy(BackOffDelayPolicy)
* @see #topologyUpdateBackOffDelayPolicy(BackOffDelayPolicy)
* @since 0.21.0
*/
EnvironmentBuilder forceLeaderForProducers(boolean forceLeader);

/**
* Create the {@link Environment} instance.
*
Expand Down
11 changes: 9 additions & 2 deletions src/main/java/com/rabbitmq/stream/impl/Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,10 @@ public StreamMetadata(String stream, short responseCode, Broker leader, List<Bro
this.stream = stream;
this.responseCode = responseCode;
this.leader = leader;
this.replicas = replicas == null ? null : Collections.unmodifiableList(replicas);
this.replicas =
(replicas == null || replicas.isEmpty())
? Collections.emptyList()
: Collections.unmodifiableList(replicas);
}

public short getResponseCode() {
Expand All @@ -2261,7 +2264,11 @@ public Broker getLeader() {
}

public List<Broker> getReplicas() {
return replicas;
return this.replicas.isEmpty() ? Collections.emptyList() : new ArrayList<>(this.replicas);
}

boolean hasReplicas() {
return !this.replicas.isEmpty();
}

public String getStream() {
Expand Down
78 changes: 54 additions & 24 deletions src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.Utils.*;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import com.rabbitmq.stream.BackOffDelayPolicy;
Expand Down Expand Up @@ -49,7 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ProducersCoordinator {
final class ProducersCoordinator implements AutoCloseable {

static final int MAX_PRODUCERS_PER_CLIENT = 256;
static final int MAX_TRACKING_CONSUMERS_PER_CLIENT = 50;
Expand All @@ -67,18 +68,21 @@ class ProducersCoordinator {
new DefaultExecutorServiceFactory(
Runtime.getRuntime().availableProcessors(), 10, "rabbitmq-stream-producer-connection-");
private final Lock coordinatorLock = new ReentrantLock();
private final boolean forceLeader;

ProducersCoordinator(
StreamEnvironment environment,
int maxProducersByClient,
int maxTrackingConsumersByClient,
Function<ClientConnectionType, String> connectionNamingStrategy,
ClientFactory clientFactory) {
ClientFactory clientFactory,
boolean forceLeader) {
this.environment = environment;
this.clientFactory = clientFactory;
this.maxProducersByClient = maxProducersByClient;
this.maxTrackingConsumersByClient = maxTrackingConsumersByClient;
this.connectionNamingStrategy = connectionNamingStrategy;
this.forceLeader = forceLeader;
}

Runnable registerProducer(StreamProducer producer, String reference, String stream) {
Expand All @@ -105,9 +109,10 @@ Runnable registerTrackingConsumer(StreamConsumer consumer) {
}

private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
Client.Broker broker = getBrokerForProducer(stream);
List<BrokerWrapper> candidates = findCandidateNodes(stream, this.forceLeader);
Broker broker = pickBroker(candidates);

addToManager(broker, tracker);
addToManager(broker, candidates, tracker);

if (DEBUG) {
return () -> {
Expand All @@ -125,7 +130,7 @@ private Runnable registerAgentTracker(AgentTracker tracker, String stream) {
}
}

private void addToManager(Broker node, AgentTracker tracker) {
private void addToManager(Broker node, List<BrokerWrapper> candidates, AgentTracker tracker) {
ClientParameters clientParameters =
environment
.clientParametersCopy()
Expand Down Expand Up @@ -153,7 +158,8 @@ private void addToManager(Broker node, AgentTracker tracker) {
if (pickedManager == null) {
String name = keyForNode(node);
LOGGER.debug("Trying to create producer manager on {}", name);
pickedManager = new ClientProducersManager(node, this.clientFactory, clientParameters);
pickedManager =
new ClientProducersManager(node, candidates, this.clientFactory, clientParameters);
LOGGER.debug("Created producer manager on {}, id {}", name, pickedManager.id);
}
try {
Expand Down Expand Up @@ -192,11 +198,12 @@ private void addToManager(Broker node, AgentTracker tracker) {
}
}

private Client.Broker getBrokerForProducer(String stream) {
// package protected for testing
List<BrokerWrapper> findCandidateNodes(String stream, boolean forceLeader) {
Map<String, Client.StreamMetadata> metadata =
this.environment.locatorOperation(
namedFunction(c -> c.metadata(stream), "Candidate lookup to publish to '%s'", stream));
if (metadata.size() == 0 || metadata.get(stream) == null) {
if (metadata.isEmpty() || metadata.get(stream) == null) {
throw new StreamDoesNotExistException(stream);
}

Expand All @@ -210,17 +217,34 @@ private Client.Broker getBrokerForProducer(String stream) {
}
}

List<BrokerWrapper> candidates = new ArrayList<>();
Client.Broker leader = streamMetadata.getLeader();
if (leader == null) {
if (leader == null && forceLeader) {
throw new IllegalStateException("Not leader available for stream " + stream);
}
LOGGER.debug(
"Using client on {}:{} to publish to {}", leader.getHost(), leader.getPort(), stream);
candidates.add(new BrokerWrapper(leader, true));

return leader;
if (!forceLeader && streamMetadata.hasReplicas()) {
candidates.addAll(
streamMetadata.getReplicas().stream()
.map(b -> new BrokerWrapper(b, false))
.collect(toList()));
}

LOGGER.debug("Candidates to publish to {}: {}", stream, candidates);

return Collections.unmodifiableList(candidates);
}

static Broker pickBroker(List<BrokerWrapper> candidates) {
return candidates.stream()
.filter(BrokerWrapper::isLeader)
.findFirst()
.map(BrokerWrapper::broker)
.orElseThrow(() -> new IllegalStateException("Not leader available"));
}

void close() {
public void close() {
Iterator<ClientProducersManager> iterator = this.managers.iterator();
while (iterator.hasNext()) {
ClientProducersManager manager = iterator.next();
Expand Down Expand Up @@ -568,7 +592,10 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
private final AtomicBoolean closed = new AtomicBoolean(false);

private ClientProducersManager(
Broker targetNode, ClientFactory cf, Client.ClientParameters clientParameters) {
Broker targetNode,
List<BrokerWrapper> candidates,
ClientFactory cf,
Client.ClientParameters clientParameters) {
this.id = managerIdSequence.getAndIncrement();
AtomicReference<String> nameReference = new AtomicReference<>();
AtomicReference<Client> ref = new AtomicReference<>();
Expand Down Expand Up @@ -682,7 +709,7 @@ private ClientProducersManager(
.metadataListener(metadataListener)
.clientProperty("connection_name", connectionName),
keyForNode(targetNode),
Collections.emptyList());
candidates.stream().map(BrokerWrapper::broker).collect(toList()));
this.client = cf.client(connectionFactoryContext);
this.node = Utils.brokerFromClient(this.client);
this.name = keyForNode(this.node);
Expand All @@ -694,18 +721,19 @@ private ClientProducersManager(

private void assignProducersToNewManagers(
Collection<AgentTracker> trackers, String stream, BackOffDelayPolicy delayPolicy) {
AsyncRetry.asyncRetry(() -> getBrokerForProducer(stream))
AsyncRetry.asyncRetry(() -> findCandidateNodes(stream, forceLeader))
.description("Candidate lookup to publish to " + stream)
.scheduler(environment.scheduledExecutorService())
.retry(ex -> !(ex instanceof StreamDoesNotExistException))
.delayPolicy(delayPolicy)
.build()
.thenAccept(
broker -> {
candidates -> {
Broker broker = pickBroker(candidates);
String key = keyForNode(broker);
LOGGER.debug(
"Assigning {} producer(s) and consumer tracker(s) to {}", trackers.size(), key);
trackers.forEach(tracker -> maybeRecoverAgent(broker, tracker));
trackers.forEach(tracker -> maybeRecoverAgent(broker, candidates, tracker));
})
.exceptionally(
ex -> {
Expand All @@ -730,10 +758,11 @@ private void assignProducersToNewManagers(
});
}

private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
private void maybeRecoverAgent(
Broker broker, List<BrokerWrapper> candidates, AgentTracker tracker) {
if (tracker.markRecoveryInProgress()) {
try {
recoverAgent(broker, tracker);
recoverAgent(broker, candidates, tracker);
} catch (Exception e) {
LOGGER.warn(
"Error while recovering {} tracker {} (stream '{}'). Reason: {}",
Expand All @@ -750,14 +779,14 @@ private void maybeRecoverAgent(Broker broker, AgentTracker tracker) {
}
}

private void recoverAgent(Broker node, AgentTracker tracker) {
private void recoverAgent(Broker node, List<BrokerWrapper> candidates, AgentTracker tracker) {
boolean reassignmentCompleted = false;
while (!reassignmentCompleted) {
try {
if (tracker.isOpen()) {
LOGGER.debug(
"Using {} to resume {} to {}", node.label(), tracker.type(), tracker.stream());
addToManager(node, tracker);
addToManager(node, candidates, tracker);
tracker.running();
} else {
LOGGER.debug(
Expand All @@ -776,14 +805,15 @@ private void recoverAgent(Broker node, AgentTracker tracker) {
tracker.identifiable() ? tracker.id() : "N/A",
tracker.stream());
// maybe not a good candidate, let's refresh and retry for this one
node =
candidates =
Utils.callAndMaybeRetry(
() -> getBrokerForProducer(tracker.stream()),
() -> findCandidateNodes(tracker.stream(), forceLeader),
ex -> !(ex instanceof StreamDoesNotExistException),
environment.recoveryBackOffDelayPolicy(),
"Candidate lookup for %s on stream '%s'",
tracker.type(),
tracker.stream());
node = pickBroker(candidates);
} catch (Exception e) {
LOGGER.warn(
"Error while re-assigning {} (stream '{}')", tracker.type(), tracker.stream(), e);
Expand Down
10 changes: 7 additions & 3 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ class StreamEnvironment implements Environment {
Function<ClientConnectionType, String> connectionNamingStrategy,
Function<Client.ClientParameters, Client> clientFactory,
ObservationCollector<?> observationCollector,
boolean forceReplicaForConsumers) {
boolean forceReplicaForConsumers,
boolean forceLeaderForProducers,
Duration producerNodeRetryDelay,
Duration consumerNodeRetryDelay) {
this.recoveryBackOffDelayPolicy = recoveryBackOffDelayPolicy;
this.topologyUpdateBackOffDelayPolicy = topologyBackOffDelayPolicy;
this.byteBufAllocator = byteBufAllocator;
Expand Down Expand Up @@ -212,13 +215,14 @@ class StreamEnvironment implements Environment {
maxProducersByConnection,
maxTrackingConsumersByConnection,
connectionNamingStrategy,
Utils.coordinatorClientFactory(this));
Utils.coordinatorClientFactory(this, producerNodeRetryDelay),
forceLeaderForProducers);
this.consumersCoordinator =
new ConsumersCoordinator(
this,
maxConsumersByConnection,
connectionNamingStrategy,
Utils.coordinatorClientFactory(this),
Utils.coordinatorClientFactory(this, consumerNodeRetryDelay),
forceReplicaForConsumers,
Utils.brokerPicker());
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023 Broadcom. All Rights Reserved.
// Copyright (c) 2020-2024 Broadcom. All Rights Reserved.
// The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
//
// This software, the RabbitMQ Stream Java client library, is dual-licensed under the
Expand Down Expand Up @@ -65,8 +65,11 @@ public class StreamEnvironmentBuilder implements EnvironmentBuilder {
private CompressionCodecFactory compressionCodecFactory;
private boolean lazyInit = false;
private boolean forceReplicaForConsumers = false;
private boolean forceLeaderForProducers = true;
private Function<Client.ClientParameters, Client> clientFactory = Client::new;
private ObservationCollector<?> observationCollector = ObservationCollector.NO_OP;
private Duration producerNodeRetryDelay = Duration.ofMillis(500);
private Duration consumerNodeRetryDelay = Duration.ofMillis(1000);

public StreamEnvironmentBuilder() {}

Expand Down Expand Up @@ -274,6 +277,12 @@ public EnvironmentBuilder forceReplicaForConsumers(boolean forceReplica) {
return this;
}

@Override
public EnvironmentBuilder forceLeaderForProducers(boolean forceLeader) {
this.forceLeaderForProducers = forceLeader;
return this;
}

@Override
public TlsConfiguration tls() {
this.tls.enable();
Expand All @@ -296,6 +305,16 @@ public EnvironmentBuilder observationCollector(ObservationCollector<?> observati
return this;
}

StreamEnvironmentBuilder producerNodeRetryDelay(Duration producerNodeRetryDelay) {
this.producerNodeRetryDelay = producerNodeRetryDelay;
return this;
}

StreamEnvironmentBuilder consumerNodeRetryDelay(Duration consumerNodeRetryDelay) {
this.consumerNodeRetryDelay = consumerNodeRetryDelay;
return this;
}

@Override
public Environment build() {
if (this.compressionCodecFactory == null) {
Expand Down Expand Up @@ -327,7 +346,10 @@ public Environment build() {
connectionNamingStrategy,
this.clientFactory,
this.observationCollector,
this.forceReplicaForConsumers);
this.forceReplicaForConsumers,
this.forceLeaderForProducers,
this.producerNodeRetryDelay,
this.consumerNodeRetryDelay);
}

static final class DefaultTlsConfiguration implements TlsConfiguration {
Expand Down
Loading
Loading