From aff12f8e06f8b6ca304a4cbe2ceade6e79fbc56c Mon Sep 17 00:00:00 2001 From: Le Karasique Date: Fri, 24 May 2019 10:31:31 +0300 Subject: [PATCH] NODE-1620: UTX packUnconfirmed limit by time (#2278) --- node/src/main/resources/application.conf | 3 +++ .../src/main/scala/com/wavesplatform/Importer.scala | 2 +- .../main/scala/com/wavesplatform/mining/Miner.scala | 4 ++-- .../com/wavesplatform/settings/MinerSettings.scala | 3 ++- .../main/scala/com/wavesplatform/utx/UtxPool.scala | 4 +++- .../scala/com/wavesplatform/utx/UtxPoolImpl.scala | 13 +++++++++---- .../mining/BlockWithMaxBaseTargetTest.scala | 2 +- .../settings/MinerSettingsSpecification.scala | 2 ++ .../wavesplatform/utx/UtxPoolSpecification.scala | 10 +++++----- scalastyle-config.xml | 2 +- 10 files changed, 29 insertions(+), 16 deletions(-) diff --git a/node/src/main/resources/application.conf b/node/src/main/resources/application.conf index 7ca06e59a51..ee9816f7af2 100644 --- a/node/src/main/resources/application.conf +++ b/node/src/main/resources/application.conf @@ -226,6 +226,9 @@ waves { # Minimal block generation offset minimal-block-generation-offset = 0 + + # Max packUnconfirmed time + max-pack-time = ${waves.miner.micro-block-interval} } # Node's REST API settings diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index 72afcc5864d..05c1d46612a 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -46,7 +46,7 @@ object Importer extends ScorexLogging { override def all = ??? override def size = ??? override def transactionById(transactionId: ByteStr) = ??? - override def packUnconfirmed(rest: MultiDimensionalMiningConstraint) = ??? + override def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: Duration): (Seq[Transaction], MultiDimensionalMiningConstraint) = ??? override def close(): Unit = {} } diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index 3fa022888c2..d085ffd6af1 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -140,7 +140,7 @@ class MinerImpl(allChannels: ChannelGroup, consensusData <- consensusData(height, account, lastBlock, refBlockBT, refBlockTS, balance, currentTime) estimators = MiningConstraints(blockchainUpdater, height, Some(minerSettings)) mdConstraint = MultiDimensionalMiningConstraint(estimators.total, estimators.keyBlock) - (unconfirmed, updatedMdConstraint) = metrics.measureLog("packing unconfirmed transactions for block")(utx.packUnconfirmed(mdConstraint)) + (unconfirmed, updatedMdConstraint) = metrics.measureLog("packing unconfirmed transactions for block")(utx.packUnconfirmed(mdConstraint, settings.minerSettings.maxPackTime)) _ = log.debug(s"Adding ${unconfirmed.size} unconfirmed transaction(s) to new block") block <- Block .buildAndSign(version.toByte, currentTime, refBlockID, consensusData, unconfirmed, account, blockFeatures(version)) @@ -177,7 +177,7 @@ class MinerImpl(allChannels: ChannelGroup, } else { val (unconfirmed, updatedTotalConstraint) = metrics.measureLog("packing unconfirmed transactions for microblock") { val mdConstraint = MultiDimensionalMiningConstraint(restTotalConstraint, constraints.micro) - val (unconfirmed, updatedMdConstraint) = utx.packUnconfirmed(mdConstraint) + val (unconfirmed, updatedMdConstraint) = utx.packUnconfirmed(mdConstraint, settings.minerSettings.maxPackTime) (unconfirmed, updatedMdConstraint.constraints.head) } diff --git a/node/src/main/scala/com/wavesplatform/settings/MinerSettings.scala b/node/src/main/scala/com/wavesplatform/settings/MinerSettings.scala index abb7197960e..60c55633f0e 100644 --- a/node/src/main/scala/com/wavesplatform/settings/MinerSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/MinerSettings.scala @@ -12,6 +12,7 @@ case class MinerSettings(enable: Boolean, minimalBlockGenerationOffset: FiniteDuration, maxTransactionsInKeyBlock: Int, maxTransactionsInMicroBlock: Int, - minMicroBlockAge: FiniteDuration) { + minMicroBlockAge: FiniteDuration, + maxPackTime: FiniteDuration) { require(maxTransactionsInMicroBlock <= Miner.MaxTransactionsPerMicroblock) } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala index 6f09272ade3..69cae3c4163 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala @@ -8,6 +8,8 @@ import com.wavesplatform.state.Portfolio import com.wavesplatform.transaction._ import com.wavesplatform.transaction.smart.script.trace.TracedResult +import scala.concurrent.duration.Duration + trait UtxPool extends AutoCloseable { self => @@ -25,6 +27,6 @@ trait UtxPool extends AutoCloseable { def transactionById(transactionId: ByteStr): Option[Transaction] - def packUnconfirmed(rest: MultiDimensionalMiningConstraint): (Seq[Transaction], MultiDimensionalMiningConstraint) + def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: Duration): (Seq[Transaction], MultiDimensionalMiningConstraint) } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala index 3428c326af4..791cf4cc3bb 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala @@ -30,6 +30,7 @@ import monix.execution.{Cancelable, Scheduler} import monix.reactive.{Observable, Observer} import scala.collection.JavaConverters._ +import scala.concurrent.duration.{Duration => ScalaDuration} import scala.util.{Left, Right} class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: Observer[(Address, Asset)], utxSettings: UtxSettings) @@ -187,12 +188,16 @@ class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: O override def transactionById(transactionId: ByteStr): Option[Transaction] = Option(transactions.get(transactionId)) - override def packUnconfirmed(rest: MultiDimensionalMiningConstraint): (Seq[Transaction], MultiDimensionalMiningConstraint) = { + override def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: ScalaDuration): (Seq[Transaction], MultiDimensionalMiningConstraint) = { val differ = TransactionDiffer(blockchain.lastBlockTimestamp, time.correctedTime(), blockchain.height) _ val (reversedValidTxs, _, finalConstraint, _, _, totalIterations) = PoolMetrics.packTimeStats.measure { + val startTime = System.nanoTime() + def isTimeLimitReached: Boolean = maxPackTime.isFinite() && (System.nanoTime() - startTime) >= maxPackTime.toNanos + transactions.values.asScala.toSeq .sorted(TransactionsOrdering.InUTXPool) - .iterator .scanLeft((Seq.empty[Transaction], Monoid[Diff].empty, rest, false, rest, 0)) { + .iterator + .scanLeft((Seq.empty[Transaction], Monoid[Diff].empty, rest, false, rest, 0)) { case ((valid, diff, currRest, _, lastOverfilled, iterations), tx) => val preUpdatedRest = currRest.put(blockchain, tx, Diff.empty) // TODO: Doesn't handle scriptRuns/scriptComplexity if (preUpdatedRest.isOverfilled) { @@ -228,7 +233,7 @@ class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: O } } } - .takeWhile(!_._4) // !currRest.isEmpty + .takeWhile(r => !r._4 && (r._1.isEmpty || !isTimeLimitReached)) // !currRest.isEmpty && (validTxs.isEmpty || !isTimeLimitReached) .reduce((_, right) => right) } @@ -287,7 +292,7 @@ class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: O } private[UtxPoolImpl] def doCleanup(): Unit = { - UtxPoolImpl.this.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited) + UtxPoolImpl.this.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, ScalaDuration.Inf) } } diff --git a/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala b/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala index 922ecbe25ea..247de150a0c 100644 --- a/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala +++ b/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala @@ -137,7 +137,7 @@ class BlockWithMaxBaseTargetTest extends FreeSpec with Matchers with WithDB with override def all = ??? override def size = ??? override def transactionById(transactionId: ByteStr) = ??? - override def packUnconfirmed(rest: MultiDimensionalMiningConstraint) = ??? + override def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: Duration): (Seq[Transaction], MultiDimensionalMiningConstraint) = ??? override def close(): Unit = {} } val schedulerService: SchedulerService = Scheduler.singleThread("appender") diff --git a/node/src/test/scala/com/wavesplatform/settings/MinerSettingsSpecification.scala b/node/src/test/scala/com/wavesplatform/settings/MinerSettingsSpecification.scala index 7d0b291d444..9e2289af37e 100644 --- a/node/src/test/scala/com/wavesplatform/settings/MinerSettingsSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/settings/MinerSettingsSpecification.scala @@ -4,6 +4,7 @@ import com.typesafe.config.ConfigFactory import net.ceedubs.ficus.Ficus._ import net.ceedubs.ficus.readers.ArbitraryTypeReader._ import org.scalatest.{FlatSpec, Matchers} + import scala.concurrent.duration._ class MinerSettingsSpecification extends FlatSpec with Matchers { @@ -20,6 +21,7 @@ class MinerSettingsSpecification extends FlatSpec with Matchers { | max-transactions-in-key-block: 300 | max-transactions-in-micro-block: 400 | min-micro-block-age: 3s + | max-pack-time: 5s | } |} """.stripMargin).resolve() diff --git a/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala b/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala index 17498a21eb8..de5ca9f904c 100644 --- a/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala @@ -437,7 +437,7 @@ class UtxPoolSpecification val maxNumber = Math.max(utx.all.size / 2, 3) val rest = limitByNumber(maxNumber) - val (packed, restUpdated) = utx.packUnconfirmed(rest) + val (packed, restUpdated) = utx.packUnconfirmed(rest, 5.seconds) packed.lengthCompare(maxNumber) should be <= 0 if (maxNumber <= utx.all.size) restUpdated.isEmpty shouldBe true @@ -450,7 +450,7 @@ class UtxPoolSpecification time.advance(maxAge + 1000.millis) - val (packed, _) = utx.packUnconfirmed(limitByNumber(100)) + val (packed, _) = utx.packUnconfirmed(limitByNumber(100), 5.seconds) packed shouldBe 'empty utx.all shouldBe 'empty } @@ -462,7 +462,7 @@ class UtxPoolSpecification time.advance(offset) - val (packed, _) = utx.packUnconfirmed(limitByNumber(100)) + val (packed, _) = utx.packUnconfirmed(limitByNumber(100), 5.seconds) packed.size shouldBe 2 utx.all.size shouldBe 2 } @@ -517,7 +517,7 @@ class UtxPoolSpecification val constraint = MultiDimensionalMiningConstraint( NonEmptyList.of(OneDimensionalMiningConstraint(1, TxEstimators.scriptRunNumber), OneDimensionalMiningConstraint(Block.MaxTransactionsPerBlockVer3, TxEstimators.one))) - val (packed, _) = utx.packUnconfirmed(constraint) + val (packed, _) = utx.packUnconfirmed(constraint, 5.seconds) packed.size shouldBe (unscripted.size + 1) packed.count(scripted.contains) shouldBe 1 } @@ -548,7 +548,7 @@ class UtxPoolSpecification val poolSizeBefore = utxPool.size time.advance(maxAge * 2) - utxPool.packUnconfirmed(limitByNumber(100)) + utxPool.packUnconfirmed(limitByNumber(100), 5.seconds) poolSizeBefore should be > utxPool.size val portfolioAfter = utxPool.pessimisticPortfolio(sender) diff --git a/scalastyle-config.xml b/scalastyle-config.xml index a95ec648935..35445ba0f97 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -97,7 +97,7 @@ - + TODO|FIXME|todo|fixme|bug|BUG