Skip to content

Commit

Permalink
Use dedicated scheduler for locator recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Dec 12, 2024
1 parent 0b92a79 commit bedf745
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 78 deletions.
48 changes: 24 additions & 24 deletions .github/workflows/test-pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,38 +11,38 @@ jobs:

steps:
- uses: actions/checkout@v4
# - name: Checkout tls-gen
# uses: actions/checkout@v4
# with:
# repository: rabbitmq/tls-gen
# path: './tls-gen'
- name: Checkout tls-gen
uses: actions/checkout@v4
with:
repository: rabbitmq/tls-gen
path: './tls-gen'
- name: Set up JDK
uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: '21'
cache: 'maven'
# - name: Start broker
# run: ci/start-broker.sh
# - name: Test (no dynamic-batch publishing)
# run: |
# ./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
# -Drabbitmq.stream.producer.dynamic.batch=false \
# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
# - name: Test (dynamic-batch publishing)
# run: |
# ./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
# -Drabbitmq.stream.producer.dynamic.batch=true \
# -Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
# -Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
# -Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
# - name: Stop broker
# run: docker stop rabbitmq && docker rm rabbitmq
- name: Start broker
run: ci/start-broker.sh
- name: Test (no dynamic-batch publishing)
run: |
./mvnw verify -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=false \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Test (dynamic-batch publishing)
run: |
./mvnw test -Drabbitmqctl.bin=DOCKER:rabbitmq \
-Drabbitmq.stream.producer.dynamic.batch=true \
-Dca.certificate=./tls-gen/basic/result/ca_certificate.pem \
-Dclient.certificate=./tls-gen/basic/result/client_$(hostname)_certificate.pem \
-Dclient.key=./tls-gen/basic/result/client_$(hostname)_key.pem
- name: Stop broker
run: docker stop rabbitmq && docker rm rabbitmq
- name: Start cluster
run: ci/start-cluster.sh
- name: Test against cluster
run: ./mvnw test -Dtest="RecoveryClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
run: ./mvnw test -Dtest="*ClusterTest" -Drabbitmqctl.bin=DOCKER:rabbitmq0
- name: Stop cluster
run: docker compose --file ci/cluster/docker-compose.yml down
21 changes: 14 additions & 7 deletions src/main/java/com/rabbitmq/stream/impl/StreamEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class StreamEnvironment implements Environment {

private final EventLoopGroup eventLoopGroup;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService locatorReconnectionScheduledExecutorService;
private final boolean privateScheduleExecutorService;
private final Client.ClientParameters clientParametersPrototype;
private final List<Address> addresses;
Expand Down Expand Up @@ -235,17 +236,22 @@ class StreamEnvironment implements Environment {
maxProducersByConnection,
maxTrackingConsumersByConnection,
connectionNamingStrategy,
Utils.coordinatorClientFactory(this, producerNodeRetryDelay),
coordinatorClientFactory(this, producerNodeRetryDelay),
forceLeaderForProducers);
this.consumersCoordinator =
new ConsumersCoordinator(
this,
maxConsumersByConnection,
connectionNamingStrategy,
Utils.coordinatorClientFactory(this, consumerNodeRetryDelay),
coordinatorClientFactory(this, consumerNodeRetryDelay),
forceReplicaForConsumers,
Utils.brokerPicker());
this.offsetTrackingCoordinator = new OffsetTrackingCoordinator(this);

ThreadFactory threadFactory = threadFactory("rabbitmq-stream-environment-locator-scheduler-");
this.locatorReconnectionScheduledExecutorService =
Executors.newScheduledThreadPool(this.locators.size(), threadFactory);

ClientParameters clientParametersForInit = locatorParametersCopy();
Runnable locatorInitSequence =
() -> {
Expand Down Expand Up @@ -291,7 +297,7 @@ class StreamEnvironment implements Environment {
l,
connectionNamingStrategy,
clientFactory,
this.scheduledExecutorService,
this.locatorReconnectionScheduledExecutorService,
this.recoveryBackOffDelayPolicy,
l.label());
}
Expand Down Expand Up @@ -338,7 +344,7 @@ private ShutdownListener shutdownListener(
locator,
connectionNamingStrategy,
clientFactory,
this.scheduledExecutorService,
this.locatorReconnectionScheduledExecutorService,
delayPolicy,
label);
} else {
Expand Down Expand Up @@ -683,6 +689,9 @@ public void close() {
if (privateScheduleExecutorService) {
this.scheduledExecutorService.shutdownNow();
}
if (this.locatorReconnectionScheduledExecutorService != null) {
this.locatorReconnectionScheduledExecutorService.shutdownNow();
}
try {
if (this.eventLoopGroup != null
&& (!this.eventLoopGroup.isShuttingDown() || !this.eventLoopGroup.isShutdown())) {
Expand Down Expand Up @@ -904,9 +913,7 @@ TrackingConsumerRegistration registerTrackingConsumer(
@Override
public String toString() {
return "{ \"locators\" : ["
+ this.locators.stream()
.map(l -> quote(l.label()))
.collect(Collectors.joining(","))
+ this.locators.stream().map(l -> quote(l.label())).collect(Collectors.joining(","))
+ "], "
+ Utils.jsonField("producer_client_count", this.producersCoordinator.clientCount())
+ ","
Expand Down
15 changes: 15 additions & 0 deletions src/test/java/com/rabbitmq/stream/Cli.java
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,15 @@ public static String rabbitmqctlCommand() {
}
}

private static String dockerContainer() {
if (rabbitmqctlCommand().startsWith("docker")) {
String rabbitmqCtl = System.getProperty("rabbitmqctl.bin");
return rabbitmqCtl.split(":")[1];
} else {
throw new IllegalStateException("Broker does not run on broker");
}
}

private static String rabbitmqStreamsCommand() {
String rabbitmqctl = rabbitmqctlCommand();
int lastIndex = rabbitmqctl.lastIndexOf("rabbitmqctl");
Expand Down Expand Up @@ -354,6 +363,12 @@ public static void restartNode(String node) {
executeCommand(dockerCommand + "rabbitmqctl status");
}

public static void restartBrokerContainer() {
String container = dockerContainer();
executeCommand("docker stop " + container);
executeCommand("docker start " + container);
}

public static void rebalance() {
rabbitmqQueues("rebalance all");
}
Expand Down
12 changes: 5 additions & 7 deletions src/test/java/com/rabbitmq/stream/impl/RecoveryClusterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ public class RecoveryClusterTest {
static List<Level> logLevels;
static List<Class<?>> logClasses =
List.of(
// ProducersCoordinator.class,
// ConsumersCoordinator.class,
AsyncRetry.class,
StreamEnvironment.class,
ScheduledExecutorServiceWrapper.class);
// ProducersCoordinator.class,
// ConsumersCoordinator.class,
AsyncRetry.class, StreamEnvironment.class, ScheduledExecutorServiceWrapper.class);
ScheduledExecutorService scheduledExecutorService;

@BeforeAll
Expand Down Expand Up @@ -125,9 +123,9 @@ static void tearDownAll() {

@ParameterizedTest
@CsvSource({
// "false,false",
"false,false",
"true,true",
// "true,false",
"true,false",
})
void clusterRestart(boolean useLoadBalancer, boolean forceLeader) throws InterruptedException {
LOGGER.info(
Expand Down
20 changes: 2 additions & 18 deletions src/test/java/com/rabbitmq/stream/impl/StreamEnvironmentTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,9 @@
// [email protected].
package com.rabbitmq.stream.impl;

import static com.rabbitmq.stream.impl.TestUtils.*;
import static com.rabbitmq.stream.impl.TestUtils.CountDownLatchConditions.completed;
import static com.rabbitmq.stream.impl.TestUtils.ExceptionConditions.responseCode;
import static com.rabbitmq.stream.impl.TestUtils.latchAssert;
import static com.rabbitmq.stream.impl.TestUtils.localhost;
import static com.rabbitmq.stream.impl.TestUtils.localhostTls;
import static com.rabbitmq.stream.impl.TestUtils.streamName;
import static com.rabbitmq.stream.impl.TestUtils.waitAtMost;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static java.util.stream.IntStream.range;
Expand Down Expand Up @@ -57,7 +53,6 @@
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.handler.ssl.SslHandler;
import java.net.ConnectException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -92,22 +87,11 @@
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class StreamEnvironmentTest {

static EventLoopGroup eventLoopGroup;

EnvironmentBuilder environmentBuilder;

String stream;
TestUtils.ClientFactory cf;

@BeforeAll
static void initAll() {
eventLoopGroup = new NioEventLoopGroup();
}

@AfterAll
static void afterAll() throws Exception {
eventLoopGroup.shutdownGracefully(1, 10, SECONDS).get(10, SECONDS);
}
EventLoopGroup eventLoopGroup;

@BeforeEach
void init() {
Expand Down
22 changes: 0 additions & 22 deletions src/test/java/com/rabbitmq/stream/impl/TlsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,35 +56,13 @@
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLHandshakeException;
import javax.net.ssl.SSLParameters;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

@DisabledIfTlsNotEnabled
@ExtendWith(TestUtils.StreamTestInfrastructureExtension.class)
public class TlsTest {

static boolean isJava13() {
String javaVersion = System.getProperty("java.version");
return javaVersion != null && javaVersion.startsWith("13.");
}

@BeforeEach
public void init() {
if (isJava13()) {
// for Java 13.0.7, see https://github.com/bcgit/bc-java/issues/941
System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "PBEWithHmacSHA256AndAES_256");
}
}

@AfterEach
public void tearDown() throws Exception {
if (isJava13()) {
System.setProperty("keystore.pkcs12.keyProtectionAlgorithm", "");
}
}

String stream;

TestUtils.ClientFactory cf;
Expand Down

0 comments on commit bedf745

Please sign in to comment.