Skip to content
This repository has been archived by the owner on May 4, 2019. It is now read-only.

Commit

Permalink
Merge branch 'release/1.6.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
Alan Scherger committed Apr 5, 2019
2 parents b033fd6 + 7573f2f commit 3c64a4b
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public DefaultProteusBrokerService(
this.addressSelector = addressSelector;
this.clientTransportFactory = clientTransportFactory;
this.poolSize = poolSize;
this.selectRefresh = poolSize / 2;
this.selectRefresh = poolSize == 1 ? 1 : poolSize / 2;
this.keepalive = keepalive;
this.tickPeriodSeconds = tickPeriodSeconds;
this.ackTimeoutSeconds = ackTimeoutSeconds;
Expand Down Expand Up @@ -182,7 +182,10 @@ private void useDiscoveryStrategy() {
discoveryStrategy
.discoverNodes()
.flatMapIterable(Function.identity())
.map(hostAndPort -> new InetSocketAddress(hostAndPort.getHost(), hostAndPort.getPort()))
.map(
hostAndPort ->
InetSocketAddress.createUnresolved(
hostAndPort.getHost(), hostAndPort.getPort()))
.collectList()
.doOnNext(
i -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ public static class Builder {
private int missedAcks = DefaultBuilderConfig.getMissedAcks();
private DiscoveryStrategy discoveryStrategy = null;
private Function<Broker, InetSocketAddress> addressSelector =
BrokerAddressSelectors.BIND_ADDRESS; // Default
BrokerAddressSelectors.TCP_ADDRESS; // Default

private Function<SocketAddress, ClientTransport> clientTransportFactory = null;
private int poolSize = Runtime.getRuntime().availableProcessors() * 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@
import java.util.function.Function;

public class BrokerAddressSelectors {

public static Function<Broker, InetSocketAddress> BIND_ADDRESS =
broker -> InetSocketAddress.createUnresolved(broker.getIpAddress(), broker.getPort());
public static Function<Broker, InetSocketAddress> TCP_ADDRESS =
broker -> InetSocketAddress.createUnresolved(broker.getTcpAddress(), broker.getTcpPort());
public static Function<Broker, InetSocketAddress> WEBSOCKET_ADDRESS =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public void testShouldDecrementActiveCountOnComplete() {
Broker b = Broker.newBuilder().setTcpAddress("localhost").setTcpPort(8001).build();
WeightedClientTransportSupplier supplier =
new WeightedClientTransportSupplier(
b, BrokerAddressSelectors.BIND_ADDRESS, address -> transport);
b, BrokerAddressSelectors.TCP_ADDRESS, address -> transport);

supplier.select();
DuplexConnection block = supplier.get().connect().block();
Expand Down

0 comments on commit 3c64a4b

Please sign in to comment.