Skip to content

Commit

Permalink
Merge pull request #478 from rabbitmq/allocate-producer-consumer-ids-…
Browse files Browse the repository at this point in the history
…in-sequence

Allocate producer/consumer IDs with a sequence
  • Loading branch information
acogoluegnes authored Dec 18, 2023
2 parents 8f5e9e8 + c973d01 commit 0d3f9e1
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 66 deletions.
42 changes: 24 additions & 18 deletions src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -576,16 +576,16 @@ private class ClientSubscriptionsManager implements Comparable<ClientSubscriptio
new ConcurrentHashMap<>();
// trackers and tracker count must be kept in sync
private volatile List<SubscriptionTracker> subscriptionTrackers =
new ArrayList<>(maxConsumersByConnection);
private volatile int trackerCount = 0;
createSubscriptionTrackerList();
private final AtomicInteger consumerIndexSequence = new AtomicInteger(0);
private volatile int trackerCount;
private final AtomicBoolean closed = new AtomicBoolean(false);

private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientParameters) {
this.id = managerIdSequence.getAndIncrement();
this.node = node;
this.name = keyForClientSubscription(node);
LOGGER.debug("creating subscription manager on {}", name);
IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null));
this.trackerCount = 0;
AtomicBoolean clientInitializedInManager = new AtomicBoolean(false);
ChunkListener chunkListener =
Expand Down Expand Up @@ -729,10 +729,9 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa
synchronized (this) {
Set<SubscriptionTracker> subscriptions = streamToStreamSubscriptions.remove(stream);
if (subscriptions != null && !subscriptions.isEmpty()) {
List<SubscriptionTracker> newSubscriptions =
new ArrayList<>(maxConsumersByConnection);
for (int i = 0; i < maxConsumersByConnection; i++) {
newSubscriptions.add(subscriptionTrackers.get(i));
List<SubscriptionTracker> newSubscriptions = createSubscriptionTrackerList();
for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
newSubscriptions.set(i, subscriptionTrackers.get(i));
}
for (SubscriptionTracker subscription : subscriptions) {
LOGGER.debug(
Expand Down Expand Up @@ -864,6 +863,12 @@ private void assignConsumersToStream(
});
}

private List<SubscriptionTracker> createSubscriptionTrackerList() {
List<SubscriptionTracker> newSubscriptions = new ArrayList<>(MAX_SUBSCRIPTIONS_PER_CLIENT);
IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(i -> newSubscriptions.add(null));
return newSubscriptions;
}

private void maybeRecoverSubscription(List<Broker> candidates, SubscriptionTracker tracker) {
if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) {
try {
Expand Down Expand Up @@ -958,13 +963,7 @@ synchronized void add(

checkNotClosed();

byte subscriptionId = 0;
for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
if (subscriptionTrackers.get(i) == null) {
subscriptionId = (byte) i;
break;
}
}
byte subscriptionId = (byte) pickSlot(this.subscriptionTrackers, this.consumerIndexSequence);

List<SubscriptionTracker> previousSubscriptions = this.subscriptionTrackers;

Expand Down Expand Up @@ -1121,15 +1120,14 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) {
}
});
closeIfEmpty();
// this.owner.maybeDisposeManager(this);
}

private List<SubscriptionTracker> update(
List<SubscriptionTracker> original, byte index, SubscriptionTracker newValue) {
List<SubscriptionTracker> newSubcriptions = new ArrayList<>(maxConsumersByConnection);
List<SubscriptionTracker> newSubcriptions = createSubscriptionTrackerList();
int intIndex = index & 0xFF;
for (int i = 0; i < maxConsumersByConnection; i++) {
newSubcriptions.add(i == intIndex ? newValue : original.get(i));
for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) {
newSubcriptions.set(i, i == intIndex ? newValue : original.get(i));
}
return newSubcriptions;
}
Expand Down Expand Up @@ -1280,4 +1278,12 @@ public long messageCount() {
return messageCount;
}
}

static <T> int pickSlot(List<T> list, AtomicInteger sequence) {
int index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_SUBSCRIPTIONS_PER_CLIENT);
while (list.get(index) != null) {
index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_SUBSCRIPTIONS_PER_CLIENT);
}
return index;
}
}
59 changes: 32 additions & 27 deletions src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
Expand Down Expand Up @@ -564,6 +565,7 @@ private class ClientProducersManager implements Comparable<ClientProducersManage
private final Broker node;
private final ConcurrentMap<Byte, ProducerTracker> producers =
new ConcurrentHashMap<>(maxProducersByClient);
private final AtomicInteger producerIndexSequence = new AtomicInteger(0);
private final Set<AgentTracker> trackingConsumerTrackers =
ConcurrentHashMap.newKeySet(maxTrackingConsumersByClient);
private final Map<String, Set<AgentTracker>> streamToTrackers = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -802,33 +804,26 @@ private synchronized void register(AgentTracker tracker) {
checkNotClosed();
if (tracker.identifiable()) {
ProducerTracker producerTracker = (ProducerTracker) tracker;
// using the next available slot
for (int i = 0; i < maxProducersByClient; i++) {
ProducerTracker previousValue = producers.putIfAbsent((byte) i, producerTracker);
if (previousValue == null) {
this.checkNotClosed();
int index = i;
Response response =
callAndMaybeRetry(
() ->
this.client.declarePublisher(
(byte) index, tracker.reference(), tracker.stream()),
RETRY_ON_TIMEOUT,
"Declare publisher request for publisher %d on stream '%s'",
producerTracker.uniqueId(),
producerTracker.stream());
if (response.isOk()) {
tracker.assign((byte) i, this.client, this);
} else {
String message =
"Error while declaring publisher: "
+ formatConstant(response.getResponseCode())
+ ". Could not assign producer to client.";
LOGGER.info(message);
throw new StreamException(message, response.getResponseCode());
}
break;
}
int index = pickSlot(this.producers, producerTracker, this.producerIndexSequence);
this.checkNotClosed();
Response response =
callAndMaybeRetry(
() ->
this.client.declarePublisher(
(byte) index, tracker.reference(), tracker.stream()),
RETRY_ON_TIMEOUT,
"Declare publisher request for publisher %d on stream '%s'",
producerTracker.uniqueId(),
producerTracker.stream());
if (response.isOk()) {
tracker.assign((byte) index, this.client, this);
} else {
String message =
"Error while declaring publisher: "
+ formatConstant(response.getResponseCode())
+ ". Could not assign producer to client.";
LOGGER.info(message);
throw new StreamException(message, response.getResponseCode());
}
producers.put(tracker.id(), producerTracker);
} else {
Expand Down Expand Up @@ -944,4 +939,14 @@ public ClientClosedException() {
super("Client already closed");
}
}

static <T> int pickSlot(ConcurrentMap<Byte, T> map, T tracker, AtomicInteger sequence) {
int index = -1;
T previousValue = tracker;
while (previousValue != null) {
index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_PRODUCERS_PER_CLIENT);
previousValue = map.putIfAbsent((byte) index, tracker);
}
return index;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay;
import static com.rabbitmq.stream.impl.ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT;
import static com.rabbitmq.stream.impl.ConsumersCoordinator.pickSlot;
import static com.rabbitmq.stream.impl.TestUtils.b;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.metadata;
Expand Down Expand Up @@ -45,12 +47,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
Expand All @@ -62,6 +59,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
Expand Down Expand Up @@ -1074,8 +1072,10 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio
assertThat(coordinator.managerCount()).isZero();
}

@Test
void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions() {
@ParameterizedTest
@ValueSource(ints = {50, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT})
void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions(
int maxConsumersByConnection) {
when(locator.metadata("stream")).thenReturn(metadata(leader(), null));

when(clientFactory.client(any())).thenReturn(client);
Expand All @@ -1089,9 +1089,16 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip
.thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK));
when(client.isOpen()).thenReturn(true);

int extraSubscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT / 5;
int subscriptionCount =
ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + extraSubscriptionCount;
int extraSubscriptionCount = maxConsumersByConnection / 5;
int subscriptionCount = maxConsumersByConnection + extraSubscriptionCount;

coordinator =
new ConsumersCoordinator(
environment,
maxConsumersByConnection,
type -> "consumer-connection",
clientFactory,
false);

List<Runnable> closingRunnables =
IntStream.range(0, subscriptionCount)
Expand Down Expand Up @@ -1129,7 +1136,7 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip

verify(client, times(1)).close();

closingRunnables.forEach(closingRunnable -> closingRunnable.run());
closingRunnables.forEach(Runnable::run);

verify(client, times(2)).close();
}
Expand Down Expand Up @@ -1927,6 +1934,47 @@ void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception {
assertThat(messageHandlerCalls.get()).isEqualTo(2);
}

@Test
void pickSlotTest() {
List<String> list = new ArrayList<>(ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT);
IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(ignored -> list.add(null));
AtomicInteger sequence = new AtomicInteger(0);
int index = pickSlot(list, sequence);
assertThat(index).isZero();
list.set(index, "0");

index = pickSlot(list, sequence);
assertThat(index).isEqualTo(1);
list.set(index, "1");
index = pickSlot(list, sequence);
assertThat(index).isEqualTo(2);
list.set(index, "2");
index = pickSlot(list, sequence);
assertThat(index).isEqualTo(3);
list.set(index, "3");

list.set(1, null);
index = pickSlot(list, sequence);
assertThat(index).isEqualTo(4);
list.set(index, "4");

sequence.set(MAX_SUBSCRIPTIONS_PER_CLIENT - 2);

index = pickSlot(list, sequence);
assertThat(index).isEqualTo(254);
list.set(index, "254");
index = pickSlot(list, sequence);
assertThat(index).isEqualTo(255);
list.set(index, "255");

// 0 is already taken, so we should get index 1 when we overflow
index = pickSlot(list, sequence);
assertThat(index).isEqualTo(1);
list.set(index, "256");
index = pickSlot(list, sequence);
assertThat(index).isEqualTo(5);
}

Client.Broker leader() {
return new Client.Broker("leader", -1);
}
Expand Down
Loading

0 comments on commit 0d3f9e1

Please sign in to comment.