From 74d62e69e3317f717b857993e43dd206b07e4403 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 3 Jun 2024 14:06:03 +0300 Subject: [PATCH] Support domain names in known peers list (#3947) --- .../scala/com/wavesplatform/it/Docker.scala | 3 +- node/src/main/resources/application.conf | 10 +-- node/src/main/resources/network-defaults.conf | 17 +---- .../wavesplatform/network/NetworkServer.scala | 72 ++++++++++--------- .../network/PeerDatabaseImpl.scala | 50 ++++++------- .../settings/NetworkSettings.scala | 4 +- .../peer/PeerDatabaseImplSpecification.scala | 19 ----- .../NetworkSettingsSpecification.scala | 2 +- 8 files changed, 74 insertions(+), 103 deletions(-) diff --git a/node-it/src/test/scala/com/wavesplatform/it/Docker.scala b/node-it/src/test/scala/com/wavesplatform/it/Docker.scala index a5c6761987e..82114f86b93 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/Docker.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/Docker.scala @@ -305,7 +305,8 @@ class Docker( private def getNodeInfo(containerId: String, settings: WavesSettings): NodeInfo = { val restApiPort = settings.restAPISettings.port - val networkPort = settings.networkSettings.bindAddress.getPort + // assume test nodes always have an open port + val networkPort = settings.networkSettings.bindAddress.get.getPort val containerInfo = inspectContainer(containerId) val wavesIpAddress = containerInfo.networkSettings().networks().get(wavesNetwork.name()).ipAddress() diff --git a/node/src/main/resources/application.conf b/node/src/main/resources/application.conf index 1664080de5e..481512ecfc3 100644 --- a/node/src/main/resources/application.conf +++ b/node/src/main/resources/application.conf @@ -61,6 +61,11 @@ waves { # Peers and blacklist storage file file = ${waves.directory}"/peers.dat" + # If defined, the node will bind to this address and accept the incoming connections. When commented out, the node + # will not accept any incoming connections and will only establish outgoing ones. If you're using UPnP for port + # mapping, make sure to specify the correct address here. + bind-address = "0.0.0.0" + # String with IP address and port to send as external address during handshake. Could be set automatically if UPnP # is enabled. # @@ -68,8 +73,6 @@ waves { # listen to incoming connections on `bind-address:port` and broadcast its `declared-address` to its peers. UPnP # is supposed to be disabled in this scenario. # - # If declared address is not set and UPnP is not enabled, the node will not listen to incoming connections at all. - # # If declared address is not set and UPnP is enabled, the node will attempt to connect to an IGD, retrieve its # external IP address and configure the gateway to allow traffic through. If the node succeeds, the IGD's external # IP address becomes the node's declared address. @@ -79,9 +82,6 @@ waves { # to `bind-address:port`. Please note, however, that this setup is not recommended. # declared-address = "1.2.3.4:6863" - # Network address - bind-address = "0.0.0.0" - # Port number port = 6863 diff --git a/node/src/main/resources/network-defaults.conf b/node/src/main/resources/network-defaults.conf index 689557ac9bd..ac578b89815 100644 --- a/node/src/main/resources/network-defaults.conf +++ b/node/src/main/resources/network-defaults.conf @@ -5,11 +5,7 @@ waves.defaults { port = 6863 known-peers = [ - "159.69.126.149:6868" - "94.130.105.239:6868" - "159.69.126.153:6868" - "94.130.172.201:6868" - "35.157.247.122:6868" + "peers-testnet.wavesnodes.com:6868" ] } } @@ -140,10 +136,7 @@ waves.defaults { # node-name = "My MAINNET node" known-peers = [ - "168.119.116.189:6868" - "135.181.87.72:6868" - "162.55.39.115:6868" - "168.119.155.201:6868" + "peers.wavesnodes.com:6868" ] } @@ -160,11 +153,7 @@ waves.defaults { port = 6862 known-peers = [ - "88.99.185.128:6868" - "95.216.205.3:6868" - "49.12.15.166:6868" - "88.198.179.16:6868" - "52.58.254.101:6868" + "peers-stagenet.wavesnodes.com:6868" ] } } diff --git a/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala b/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala index a552fbaa037..f084da43695 100644 --- a/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala +++ b/node/src/main/scala/com/wavesplatform/network/NetworkServer.scala @@ -4,6 +4,7 @@ import com.wavesplatform.Version import com.wavesplatform.metrics.Metrics import com.wavesplatform.network.MessageObserver.Messages import com.wavesplatform.settings.* +import com.wavesplatform.state.Cast import com.wavesplatform.transaction.* import com.wavesplatform.utils.ScorexLogging import io.netty.bootstrap.{Bootstrap, ServerBootstrap} @@ -16,7 +17,7 @@ import io.netty.util.concurrent.DefaultThreadFactory import monix.reactive.Observable import org.influxdb.dto.Point -import java.net.{InetSocketAddress, NetworkInterface} +import java.net.{InetSocketAddress, NetworkInterface, SocketAddress} import java.nio.channels.ClosedChannelException import java.util.concurrent.ConcurrentHashMap import scala.concurrent.duration.* @@ -59,17 +60,17 @@ object NetworkServer extends ScorexLogging { val trafficLogger = new TrafficLogger(settings.networkSettings.trafficLogger) val messageCodec = new MessageCodec(peerDatabase) - val excludedAddresses: Set[InetSocketAddress] = { - val bindAddress = settings.networkSettings.bindAddress - val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress) - val localAddresses = if (isLocal) { - NetworkInterface.getNetworkInterfaces.asScala - .flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort))) - .toSet - } else Set(bindAddress) + val excludedAddresses: Set[InetSocketAddress] = + settings.networkSettings.bindAddress.fold(Set.empty[InetSocketAddress]) { bindAddress => + val isLocal = Option(bindAddress.getAddress).exists(_.isAnyLocalAddress) + val localAddresses = if (isLocal) { + NetworkInterface.getNetworkInterfaces.asScala + .flatMap(_.getInetAddresses.asScala.map(a => new InetSocketAddress(a, bindAddress.getPort))) + .toSet + } else Set(bindAddress) - localAddresses ++ settings.networkSettings.declaredAddress.toSet - } + localAddresses ++ settings.networkSettings.declaredAddress.toSet + } val lengthFieldPrepender = new LengthFieldPrepender(4) @@ -113,7 +114,7 @@ object NetworkServer extends ScorexLogging { fatalErrorHandler ) - val serverChannel = settings.networkSettings.declaredAddress.map { _ => + val serverChannel = settings.networkSettings.bindAddress.map { bindAddress => new ServerBootstrap() .group(bossGroup, workerGroup) .channel(classOf[NioServerSocketChannel]) @@ -128,7 +129,7 @@ object NetworkServer extends ScorexLogging { ) ++ pipelineTail ) ) - .bind(settings.networkSettings.bindAddress) + .bind(bindAddress) .channel() } @@ -166,6 +167,8 @@ object NetworkServer extends ScorexLogging { s"Channel closed: ${Option(closeFuture.cause()).map(_.getMessage).getOrElse("no message")}" ) ) + + logConnections() } def handleConnectionAttempt(remoteAddress: InetSocketAddress)(thisConnFuture: ChannelFuture): Unit = { @@ -188,6 +191,7 @@ object NetworkServer extends ScorexLogging { case other => log.debug(formatOutgoingChannelEvent(thisConnFuture.channel(), other.getMessage)) } } + logConnections() } def doConnect(remoteAddress: InetSocketAddress): Unit = @@ -201,36 +205,40 @@ object NetworkServer extends ScorexLogging { } ) - def scheduleConnectTask(): Unit = if (!shutdownInitiated) { - val delay = (if (peerConnectionsMap.isEmpty) AverageHandshakePeriod else 5.seconds) + - (Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously - log.trace(s"Next connection attempt in $delay") + def logConnections(): Unit = { + def mkAddressString(addresses: IterableOnce[SocketAddress]) = + addresses.iterator.map(_.toString).toVector.sorted.mkString("[", ",", "]") - workerGroup.schedule(delay) { - val outgoing = outgoingChannels.keySet.iterator().asScala.toVector + val incoming = peerInfo.values().asScala.view.map(_.remoteAddress).filterNot(outgoingChannels.containsKey) - def outgoingStr = outgoing.map(_.toString).sorted.mkString("[", ", ", "]") + lazy val incomingStr = mkAddressString(incoming) + lazy val outgoingStr = mkAddressString(outgoingChannels.keySet.iterator().asScala) - val all = peerInfo.values().iterator().asScala.flatMap(_.declaredAddress).toVector - val incoming = all.filterNot(outgoing.contains) + val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress]) - def incomingStr = incoming.map(_.toString).sorted.mkString("[", ", ", "]") + log.trace(s"Outgoing: $outgoingStr ++ incoming: $incomingStr") - log.trace(s"Outgoing: $outgoingStr ++ incoming: $incomingStr") + Metrics.write( + Point + .measurement("connections") + .addField("outgoing", outgoingStr) + .addField("incoming", incomingStr) + .addField("n", all.size) + ) + } + + def scheduleConnectTask(): Unit = if (!shutdownInitiated) { + val delay = (if (peerConnectionsMap.isEmpty) AverageHandshakePeriod else 5.seconds) + + (Random.nextInt(1000) - 500).millis // add some noise so that nodes don't attempt to connect to each other simultaneously + + workerGroup.schedule(delay) { if (outgoingChannels.size() < settings.networkSettings.maxOutboundConnections) { + val all = peerInfo.values().iterator().asScala.flatMap(_.remoteAddress.cast[InetSocketAddress]) peerDatabase .randomPeer(excluded = excludedAddresses ++ all) .foreach(doConnect) } - Metrics.write( - Point - .measurement("connections") - .addField("outgoing", outgoingStr) - .addField("incoming", incomingStr) - .addField("n", all.size) - ) - scheduleConnectTask() } } diff --git a/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala b/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala index 99017da6630..2936a424727 100644 --- a/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala +++ b/node/src/main/scala/com/wavesplatform/network/PeerDatabaseImpl.scala @@ -1,8 +1,5 @@ package com.wavesplatform.network -import java.net.{InetAddress, InetSocketAddress} -import java.util.concurrent.TimeUnit - import com.google.common.cache.{CacheBuilder, RemovalNotification} import com.google.common.collect.EvictingQueue import com.wavesplatform.settings.NetworkSettings @@ -10,9 +7,12 @@ import com.wavesplatform.utils.{JsonFileStorage, ScorexLogging} import io.netty.channel.Channel import io.netty.channel.socket.nio.NioSocketChannel -import scala.jdk.CollectionConverters.* +import java.net.{InetAddress, InetSocketAddress} +import java.util.concurrent.TimeUnit +import scala.annotation.tailrec import scala.collection.* import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters.* import scala.util.Random import scala.util.control.NonFatal @@ -36,23 +36,12 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor } private type PeersPersistenceType = Set[String] - private val peersPersistence = cache[InetSocketAddress](settings.peersDataResidenceTime, Some(nonExpiringKnownPeers)) + private val peersPersistence = cache[InetSocketAddress](settings.peersDataResidenceTime) private val blacklist = cache[InetAddress](settings.blackListResidenceTime) private val suspension = cache[InetAddress](settings.suspensionResidenceTime) private val reasons = mutable.Map.empty[InetAddress, String] private val unverifiedPeers = EvictingQueue.create[InetSocketAddress](settings.maxUnverifiedPeers) - private val knownPeersAddresses = settings.knownPeers.map(inetSocketAddress(_, 6863)) - - private def nonExpiringKnownPeers(n: PeerRemoved[InetSocketAddress]): Unit = - if (n.wasEvicted() && knownPeersAddresses.contains(n.getKey)) - peersPersistence.put(n.getKey, n.getValue) - - for (a <- knownPeersAddresses) { - // add peers from config with max timestamp so they never get evicted from the list of known peers - doTouch(a, Long.MaxValue) - } - for (f <- settings.file if f.exists()) try { JsonFileStorage.load[PeersPersistenceType](f.getCanonicalPath).foreach(a => touch(inetSocketAddress(a, 6863))) log.info(s"Loaded ${peersPersistence.size} known peer(s) from ${f.getName}") @@ -62,7 +51,7 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor override def addCandidate(socketAddress: InetSocketAddress): Boolean = unverifiedPeers.synchronized { val r = !socketAddress.getAddress.isAnyLocalAddress && - !(socketAddress.getAddress.isLoopbackAddress && socketAddress.getPort == settings.bindAddress.getPort) && + !(socketAddress.getAddress.isLoopbackAddress && settings.bindAddress.exists(_.getPort == socketAddress.getPort)) && Option(peersPersistence.getIfPresent(socketAddress)).isEmpty && !unverifiedPeers.contains(socketAddress) if (r) unverifiedPeers.add(socketAddress) @@ -112,7 +101,7 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor override def suspendedHosts: immutable.Set[InetAddress] = suspension.asMap().asScala.keys.toSet override def detailedBlacklist: immutable.Map[InetAddress, (Long, String)] = - blacklist.asMap().asScala.view.mapValues(_.toLong).map { case ((h, t)) => h -> ((t, Option(reasons(h)).getOrElse(""))) }.toMap + blacklist.asMap().asScala.view.mapValues(_.toLong).map { case (h, t) => h -> ((t, Option(reasons(h)).getOrElse(""))) }.toMap override def detailedSuspended: immutable.Map[InetAddress, Long] = suspension.asMap().asScala.view.mapValues(_.toLong).toMap @@ -120,18 +109,21 @@ class PeerDatabaseImpl(settings: NetworkSettings) extends PeerDatabase with Scor def excludeAddress(isa: InetSocketAddress): Boolean = { excluded(isa) || Option(isa.getAddress).exists(blacklistedHosts) || suspendedHosts(isa.getAddress) } - // excluded only contains local addresses, our declared address, and external declared addresses we already have - // connection to, so it's safe to filter out all matching candidates - unverifiedPeers.removeIf(excluded(_)) - val unverified = Option(unverifiedPeers.peek()).filterNot(excludeAddress) - val verified = Random.shuffle(knownPeers.keySet.diff(excluded).toSeq).headOption.filterNot(excludeAddress) - - (unverified, verified) match { - case (Some(_), v @ Some(_)) => if (Random.nextBoolean()) Some(unverifiedPeers.poll()) else v - case (Some(_), None) => Some(unverifiedPeers.poll()) - case (None, v @ Some(_)) => v - case _ => None + + @tailrec + def nextUnverified(): Option[InetSocketAddress] = { + unverifiedPeers.poll() match { + case null => None + case nonNull => + if (!excludeAddress(nonNull)) Some(nonNull) else nextUnverified() + } } + + nextUnverified() orElse Random + .shuffle( + (knownPeers.keySet ++ settings.knownPeers.map(p => inetSocketAddress(p, 6868))).filterNot(excludeAddress) + ) + .headOption } def clearBlacklist(): Unit = { diff --git a/node/src/main/scala/com/wavesplatform/settings/NetworkSettings.scala b/node/src/main/scala/com/wavesplatform/settings/NetworkSettings.scala index e547185bf23..0624d0003bd 100644 --- a/node/src/main/scala/com/wavesplatform/settings/NetworkSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/NetworkSettings.scala @@ -17,7 +17,7 @@ case class UPnPSettings(enable: Boolean, gatewayTimeout: FiniteDuration, discove case class NetworkSettings( file: Option[File], - bindAddress: InetSocketAddress, + bindAddress: Option[InetSocketAddress], declaredAddress: Option[InetSocketAddress], nodeName: String, nonce: Long, @@ -48,7 +48,7 @@ object NetworkSettings { private[this] def fromConfig(config: Config): NetworkSettings = { val file = config.getAs[File]("file") - val bindAddress = new InetSocketAddress(config.as[String]("bind-address"), config.as[Int]("port")) + val bindAddress = config.getAs[String]("bind-address").map(addr => new InetSocketAddress(addr, config.as[Int]("port"))) val nonce = config.getOrElse("nonce", randomNonce) val nodeName = config.getOrElse("node-name", s"Node-$nonce") require(nodeName.utf8Bytes.length <= MaxNodeNameBytesLength, s"Node name should have length less than $MaxNodeNameBytesLength bytes") diff --git a/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala b/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala index bb7715d09c1..831f65bf186 100644 --- a/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/network/peer/PeerDatabaseImplSpecification.scala @@ -37,17 +37,6 @@ class PeerDatabaseImplSpecification extends FreeSpec { .resolve() private val settings2 = config2.as[NetworkSettings]("waves.network") - private val config3 = ConfigFactory - .parseString(s"""waves.network { - | file = null - | known-peers = ["$host1:1"] - | peers-data-residence-time = 2s - | enable-peers-exchange = no - |}""".stripMargin) - .withFallback(ConfigFactory.load()) - .resolve() - private val settings3 = config3.as[NetworkSettings]("waves.network") - private def withDatabase(settings: NetworkSettings)(f: PeerDatabase => Unit): Unit = { val pdb = new PeerDatabaseImpl(settings) f(pdb) @@ -78,14 +67,6 @@ class PeerDatabaseImplSpecification extends FreeSpec { database.randomPeer(Set()) shouldBe empty } - "known-peers should be always in database" in withDatabase(settings3) { database3 => - database3.knownPeers.keys should contain(address1) - sleepLong() - database3.knownPeers.keys should contain(address1) - sleepShort() - database3.knownPeers.keys should contain(address1) - } - "touching peer prevent it from obsoleting" in withDatabase(settings1) { database => database.addCandidate(address1) database.touch(address1) diff --git a/node/src/test/scala/com/wavesplatform/settings/NetworkSettingsSpecification.scala b/node/src/test/scala/com/wavesplatform/settings/NetworkSettingsSpecification.scala index 4c4c46f6fd9..b2a7acda6ce 100644 --- a/node/src/test/scala/com/wavesplatform/settings/NetworkSettingsSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/settings/NetworkSettingsSpecification.scala @@ -42,7 +42,7 @@ class NetworkSettingsSpecification extends FlatSpec { |}""".stripMargin)) val networkSettings = config.as[NetworkSettings]("waves.network") - networkSettings.bindAddress should be(new InetSocketAddress("127.0.0.1", 6868)) + networkSettings.bindAddress should be(Some(new InetSocketAddress("127.0.0.1", 6868))) networkSettings.nodeName should be("default-node-name") networkSettings.declaredAddress should be(Some(new InetSocketAddress("127.0.0.1", 6868))) networkSettings.nonce should be(0)