diff --git a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java index 7cbc532e7e..99191cee2a 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ConsumersCoordinator.java @@ -576,8 +576,9 @@ private class ClientSubscriptionsManager implements Comparable(); // trackers and tracker count must be kept in sync private volatile List subscriptionTrackers = - new ArrayList<>(maxConsumersByConnection); - private volatile int trackerCount = 0; + createSubscriptionTrackerList(); + private final AtomicInteger consumerIndexSequence = new AtomicInteger(0); + private volatile int trackerCount; private final AtomicBoolean closed = new AtomicBoolean(false); private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientParameters) { @@ -585,7 +586,6 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa this.node = node; this.name = keyForClientSubscription(node); LOGGER.debug("creating subscription manager on {}", name); - IntStream.range(0, maxConsumersByConnection).forEach(i -> subscriptionTrackers.add(null)); this.trackerCount = 0; AtomicBoolean clientInitializedInManager = new AtomicBoolean(false); ChunkListener chunkListener = @@ -729,10 +729,9 @@ private ClientSubscriptionsManager(Broker node, Client.ClientParameters clientPa synchronized (this) { Set subscriptions = streamToStreamSubscriptions.remove(stream); if (subscriptions != null && !subscriptions.isEmpty()) { - List newSubscriptions = - new ArrayList<>(maxConsumersByConnection); - for (int i = 0; i < maxConsumersByConnection; i++) { - newSubscriptions.add(subscriptionTrackers.get(i)); + List newSubscriptions = createSubscriptionTrackerList(); + for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) { + newSubscriptions.set(i, subscriptionTrackers.get(i)); } for (SubscriptionTracker subscription : subscriptions) { LOGGER.debug( @@ -864,6 +863,12 @@ private void assignConsumersToStream( }); } + private List createSubscriptionTrackerList() { + List newSubscriptions = new ArrayList<>(MAX_SUBSCRIPTIONS_PER_CLIENT); + IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(i -> newSubscriptions.add(null)); + return newSubscriptions; + } + private void maybeRecoverSubscription(List candidates, SubscriptionTracker tracker) { if (tracker.compareAndSet(SubscriptionState.ACTIVE, SubscriptionState.RECOVERING)) { try { @@ -958,13 +963,7 @@ synchronized void add( checkNotClosed(); - byte subscriptionId = 0; - for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) { - if (subscriptionTrackers.get(i) == null) { - subscriptionId = (byte) i; - break; - } - } + byte subscriptionId = (byte) pickSlot(this.subscriptionTrackers, this.consumerIndexSequence); List previousSubscriptions = this.subscriptionTrackers; @@ -1121,15 +1120,14 @@ synchronized void remove(SubscriptionTracker subscriptionTracker) { } }); closeIfEmpty(); - // this.owner.maybeDisposeManager(this); } private List update( List original, byte index, SubscriptionTracker newValue) { - List newSubcriptions = new ArrayList<>(maxConsumersByConnection); + List newSubcriptions = createSubscriptionTrackerList(); int intIndex = index & 0xFF; - for (int i = 0; i < maxConsumersByConnection; i++) { - newSubcriptions.add(i == intIndex ? newValue : original.get(i)); + for (int i = 0; i < MAX_SUBSCRIPTIONS_PER_CLIENT; i++) { + newSubcriptions.set(i, i == intIndex ? newValue : original.get(i)); } return newSubcriptions; } @@ -1280,4 +1278,12 @@ public long messageCount() { return messageCount; } } + + static int pickSlot(List list, AtomicInteger sequence) { + int index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_SUBSCRIPTIONS_PER_CLIENT); + while (list.get(index) != null) { + index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_SUBSCRIPTIONS_PER_CLIENT); + } + return index; + } } diff --git a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java index ce7d4f0f06..143ef57204 100644 --- a/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java +++ b/src/main/java/com/rabbitmq/stream/impl/ProducersCoordinator.java @@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentSkipListSet; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -564,6 +565,7 @@ private class ClientProducersManager implements Comparable producers = new ConcurrentHashMap<>(maxProducersByClient); + private final AtomicInteger producerIndexSequence = new AtomicInteger(0); private final Set trackingConsumerTrackers = ConcurrentHashMap.newKeySet(maxTrackingConsumersByClient); private final Map> streamToTrackers = new ConcurrentHashMap<>(); @@ -802,33 +804,26 @@ private synchronized void register(AgentTracker tracker) { checkNotClosed(); if (tracker.identifiable()) { ProducerTracker producerTracker = (ProducerTracker) tracker; - // using the next available slot - for (int i = 0; i < maxProducersByClient; i++) { - ProducerTracker previousValue = producers.putIfAbsent((byte) i, producerTracker); - if (previousValue == null) { - this.checkNotClosed(); - int index = i; - Response response = - callAndMaybeRetry( - () -> - this.client.declarePublisher( - (byte) index, tracker.reference(), tracker.stream()), - RETRY_ON_TIMEOUT, - "Declare publisher request for publisher %d on stream '%s'", - producerTracker.uniqueId(), - producerTracker.stream()); - if (response.isOk()) { - tracker.assign((byte) i, this.client, this); - } else { - String message = - "Error while declaring publisher: " - + formatConstant(response.getResponseCode()) - + ". Could not assign producer to client."; - LOGGER.info(message); - throw new StreamException(message, response.getResponseCode()); - } - break; - } + int index = pickSlot(this.producers, producerTracker, this.producerIndexSequence); + this.checkNotClosed(); + Response response = + callAndMaybeRetry( + () -> + this.client.declarePublisher( + (byte) index, tracker.reference(), tracker.stream()), + RETRY_ON_TIMEOUT, + "Declare publisher request for publisher %d on stream '%s'", + producerTracker.uniqueId(), + producerTracker.stream()); + if (response.isOk()) { + tracker.assign((byte) index, this.client, this); + } else { + String message = + "Error while declaring publisher: " + + formatConstant(response.getResponseCode()) + + ". Could not assign producer to client."; + LOGGER.info(message); + throw new StreamException(message, response.getResponseCode()); } producers.put(tracker.id(), producerTracker); } else { @@ -944,4 +939,14 @@ public ClientClosedException() { super("Client already closed"); } } + + static int pickSlot(ConcurrentMap map, T tracker, AtomicInteger sequence) { + int index = -1; + T previousValue = tracker; + while (previousValue != null) { + index = Integer.remainderUnsigned(sequence.getAndIncrement(), MAX_PRODUCERS_PER_CLIENT); + previousValue = map.putIfAbsent((byte) index, tracker); + } + return index; + } } diff --git a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java index 06ce9f4ebc..3bd6d24025 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ConsumersCoordinatorTest.java @@ -15,6 +15,8 @@ package com.rabbitmq.stream.impl; import static com.rabbitmq.stream.BackOffDelayPolicy.fixedWithInitialDelay; +import static com.rabbitmq.stream.impl.ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT; +import static com.rabbitmq.stream.impl.ConsumersCoordinator.pickSlot; import static com.rabbitmq.stream.impl.TestUtils.b; import static com.rabbitmq.stream.impl.TestUtils.latchAssert; import static com.rabbitmq.stream.impl.TestUtils.metadata; @@ -45,12 +47,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -62,6 +59,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.ArgumentCaptor; import org.mockito.Captor; import org.mockito.Mock; @@ -1074,8 +1072,10 @@ void metadataUpdate_shouldCloseConsumerIfRetryTimeoutIsReached() throws Exceptio assertThat(coordinator.managerCount()).isZero(); } - @Test - void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions() { + @ParameterizedTest + @ValueSource(ints = {50, ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT}) + void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscriptions( + int maxConsumersByConnection) { when(locator.metadata("stream")).thenReturn(metadata(leader(), null)); when(clientFactory.client(any())).thenReturn(client); @@ -1089,9 +1089,16 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip .thenReturn(new Client.Response(Constants.RESPONSE_CODE_OK)); when(client.isOpen()).thenReturn(true); - int extraSubscriptionCount = ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT / 5; - int subscriptionCount = - ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT + extraSubscriptionCount; + int extraSubscriptionCount = maxConsumersByConnection / 5; + int subscriptionCount = maxConsumersByConnection + extraSubscriptionCount; + + coordinator = + new ConsumersCoordinator( + environment, + maxConsumersByConnection, + type -> "consumer-connection", + clientFactory, + false); List closingRunnables = IntStream.range(0, subscriptionCount) @@ -1129,7 +1136,7 @@ void shouldUseNewClientsForMoreThanMaxSubscriptionsAndCloseClientAfterUnsubscrip verify(client, times(1)).close(); - closingRunnables.forEach(closingRunnable -> closingRunnable.run()); + closingRunnables.forEach(Runnable::run); verify(client, times(2)).close(); } @@ -1927,6 +1934,47 @@ void shouldRetryUntilReplicaIsAvailableWhenForceReplicaIsOn() throws Exception { assertThat(messageHandlerCalls.get()).isEqualTo(2); } + @Test + void pickSlotTest() { + List list = new ArrayList<>(ConsumersCoordinator.MAX_SUBSCRIPTIONS_PER_CLIENT); + IntStream.range(0, MAX_SUBSCRIPTIONS_PER_CLIENT).forEach(ignored -> list.add(null)); + AtomicInteger sequence = new AtomicInteger(0); + int index = pickSlot(list, sequence); + assertThat(index).isZero(); + list.set(index, "0"); + + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(1); + list.set(index, "1"); + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(2); + list.set(index, "2"); + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(3); + list.set(index, "3"); + + list.set(1, null); + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(4); + list.set(index, "4"); + + sequence.set(MAX_SUBSCRIPTIONS_PER_CLIENT - 2); + + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(254); + list.set(index, "254"); + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(255); + list.set(index, "255"); + + // 0 is already taken, so we should get index 1 when we overflow + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(1); + list.set(index, "256"); + index = pickSlot(list, sequence); + assertThat(index).isEqualTo(5); + } + Client.Broker leader() { return new Client.Broker("leader", -1); } diff --git a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java index 50c711d12c..68ed0cd4dd 100644 --- a/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java +++ b/src/test/java/com/rabbitmq/stream/impl/ProducersCoordinatorTest.java @@ -14,6 +14,8 @@ // info@rabbitmq.com. package com.rabbitmq.stream.impl; +import static com.rabbitmq.stream.impl.ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT; +import static com.rabbitmq.stream.impl.ProducersCoordinator.pickSlot; import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed; import static com.rabbitmq.stream.impl.TestUtils.answer; import static com.rabbitmq.stream.impl.TestUtils.metadata; @@ -40,15 +42,15 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.stubbing.Answer; @@ -501,16 +503,25 @@ void shouldDisposeProducerAndNotTrackingConsumerIfMetadataUpdateTimesOut() throw assertThat(coordinator.clientCount()).isEqualTo(0); } - @Test - void growShrinkResourcesBasedOnProducersAndTrackingConsumersCount() { + @ParameterizedTest + @ValueSource(ints = {50, ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT}) + void growShrinkResourcesBasedOnProducersAndTrackingConsumersCount(int maxProducersByClient) { scheduledExecutorService = createScheduledExecutorService(); when(environment.scheduledExecutorService()).thenReturn(scheduledExecutorService); when(locator.metadata("stream")).thenReturn(metadata(leader(), replicas())); when(clientFactory.client(any())).thenReturn(client); - int extraProducerCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT / 5; - int producerCount = ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT + extraProducerCount; + int extraProducerCount = maxProducersByClient / 5; + int producerCount = maxProducersByClient + extraProducerCount; + + coordinator = + new ProducersCoordinator( + environment, + maxProducersByClient, + ProducersCoordinator.MAX_TRACKING_CONSUMERS_PER_CLIENT, + type -> "producer-connection", + clientFactory); class ProducerInfo { StreamProducer producer; @@ -578,6 +589,7 @@ class TrackingConsumerInfo { assertThat(coordinator.clientCount()).isEqualTo(2); + // we are closing one of the producers to check the next allocated publisher ID ProducerInfo info = producerInfos.get(10); info.cleaningCallback.run(); @@ -589,7 +601,12 @@ class TrackingConsumerInfo { coordinator.registerProducer(p, null, "stream"); verify(p, times(1)).setClient(client); - assertThat(publishingIdForNewProducer.get()).isEqualTo(info.publishingId); + // if the soft limit is less than the hard limit, publisher IDs keep going up + // if the soft limit is equal to the hard limit, we re-use the ID that has just been left + // available + int expectedPublishingId = + maxProducersByClient < MAX_PRODUCERS_PER_CLIENT ? maxProducersByClient : info.publishingId; + assertThat(publishingIdForNewProducer).hasValue((byte) expectedPublishingId); assertThat(coordinator.nodesConnected()).isEqualTo(1); assertThat(coordinator.clientCount()).isEqualTo(2); @@ -640,6 +657,28 @@ void producerShouldBeCreatedProperlyIfManagerClientIsRetried() throws Exception assertThat(setClientLatch).is(completed()); } + @Test + void pickSlotTest() { + ConcurrentMap map = new ConcurrentHashMap<>(); + AtomicInteger sequence = new AtomicInteger(0); + assertThat(pickSlot(map, "0", sequence)).isZero(); + assertThat(sequence).hasValue(1); + assertThat(map).hasSize(1); + assertThat(pickSlot(map, "1", sequence)).isEqualTo(1); + assertThat(pickSlot(map, "2", sequence)).isEqualTo(2); + assertThat(pickSlot(map, "3", sequence)).isEqualTo(3); + map.remove((byte) 1); + assertThat(pickSlot(map, "4", sequence)).isEqualTo(4); + assertThat(map).hasSize(4); + + sequence.set(ProducersCoordinator.MAX_PRODUCERS_PER_CLIENT - 2); + assertThat(pickSlot(map, "254", sequence)).isEqualTo(254); + assertThat(pickSlot(map, "255", sequence)).isEqualTo(255); + // 0 is already taken, so we should get index 1 when we overflow + assertThat(pickSlot(map, "256", sequence)).isEqualTo(1); + assertThat(pickSlot(map, "257", sequence)).isEqualTo(5); + } + private static ScheduledExecutorService createScheduledExecutorService() { return new ScheduledExecutorServiceWrapper(Executors.newSingleThreadScheduledExecutor()); }