Skip to content

Commit

Permalink
NODE-1620: UTX packUnconfirmed limit by time (wavesplatform#2278)
Browse files Browse the repository at this point in the history
  • Loading branch information
Karasiq authored and Sergey Nazarov committed May 24, 2019
1 parent 954701d commit aff12f8
Show file tree
Hide file tree
Showing 10 changed files with 29 additions and 16 deletions.
3 changes: 3 additions & 0 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,9 @@ waves {

# Minimal block generation offset
minimal-block-generation-offset = 0

# Max packUnconfirmed time
max-pack-time = ${waves.miner.micro-block-interval}
}

# Node's REST API settings
Expand Down
2 changes: 1 addition & 1 deletion node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ object Importer extends ScorexLogging {
override def all = ???
override def size = ???
override def transactionById(transactionId: ByteStr) = ???
override def packUnconfirmed(rest: MultiDimensionalMiningConstraint) = ???
override def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: Duration): (Seq[Transaction], MultiDimensionalMiningConstraint) = ???
override def close(): Unit = {}
}

Expand Down
4 changes: 2 additions & 2 deletions node/src/main/scala/com/wavesplatform/mining/Miner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class MinerImpl(allChannels: ChannelGroup,
consensusData <- consensusData(height, account, lastBlock, refBlockBT, refBlockTS, balance, currentTime)
estimators = MiningConstraints(blockchainUpdater, height, Some(minerSettings))
mdConstraint = MultiDimensionalMiningConstraint(estimators.total, estimators.keyBlock)
(unconfirmed, updatedMdConstraint) = metrics.measureLog("packing unconfirmed transactions for block")(utx.packUnconfirmed(mdConstraint))
(unconfirmed, updatedMdConstraint) = metrics.measureLog("packing unconfirmed transactions for block")(utx.packUnconfirmed(mdConstraint, settings.minerSettings.maxPackTime))
_ = log.debug(s"Adding ${unconfirmed.size} unconfirmed transaction(s) to new block")
block <- Block
.buildAndSign(version.toByte, currentTime, refBlockID, consensusData, unconfirmed, account, blockFeatures(version))
Expand Down Expand Up @@ -177,7 +177,7 @@ class MinerImpl(allChannels: ChannelGroup,
} else {
val (unconfirmed, updatedTotalConstraint) = metrics.measureLog("packing unconfirmed transactions for microblock") {
val mdConstraint = MultiDimensionalMiningConstraint(restTotalConstraint, constraints.micro)
val (unconfirmed, updatedMdConstraint) = utx.packUnconfirmed(mdConstraint)
val (unconfirmed, updatedMdConstraint) = utx.packUnconfirmed(mdConstraint, settings.minerSettings.maxPackTime)
(unconfirmed, updatedMdConstraint.constraints.head)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ case class MinerSettings(enable: Boolean,
minimalBlockGenerationOffset: FiniteDuration,
maxTransactionsInKeyBlock: Int,
maxTransactionsInMicroBlock: Int,
minMicroBlockAge: FiniteDuration) {
minMicroBlockAge: FiniteDuration,
maxPackTime: FiniteDuration) {
require(maxTransactionsInMicroBlock <= Miner.MaxTransactionsPerMicroblock)
}
4 changes: 3 additions & 1 deletion node/src/main/scala/com/wavesplatform/utx/UtxPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.wavesplatform.state.Portfolio
import com.wavesplatform.transaction._
import com.wavesplatform.transaction.smart.script.trace.TracedResult

import scala.concurrent.duration.Duration

trait UtxPool extends AutoCloseable {
self =>

Expand All @@ -25,6 +27,6 @@ trait UtxPool extends AutoCloseable {

def transactionById(transactionId: ByteStr): Option[Transaction]

def packUnconfirmed(rest: MultiDimensionalMiningConstraint): (Seq[Transaction], MultiDimensionalMiningConstraint)
def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: Duration): (Seq[Transaction], MultiDimensionalMiningConstraint)

}
13 changes: 9 additions & 4 deletions node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import monix.execution.{Cancelable, Scheduler}
import monix.reactive.{Observable, Observer}

import scala.collection.JavaConverters._
import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.util.{Left, Right}

class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: Observer[(Address, Asset)], utxSettings: UtxSettings)
Expand Down Expand Up @@ -187,12 +188,16 @@ class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: O

override def transactionById(transactionId: ByteStr): Option[Transaction] = Option(transactions.get(transactionId))

override def packUnconfirmed(rest: MultiDimensionalMiningConstraint): (Seq[Transaction], MultiDimensionalMiningConstraint) = {
override def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: ScalaDuration): (Seq[Transaction], MultiDimensionalMiningConstraint) = {
val differ = TransactionDiffer(blockchain.lastBlockTimestamp, time.correctedTime(), blockchain.height) _
val (reversedValidTxs, _, finalConstraint, _, _, totalIterations) = PoolMetrics.packTimeStats.measure {
val startTime = System.nanoTime()
def isTimeLimitReached: Boolean = maxPackTime.isFinite() && (System.nanoTime() - startTime) >= maxPackTime.toNanos

transactions.values.asScala.toSeq
.sorted(TransactionsOrdering.InUTXPool)
.iterator .scanLeft((Seq.empty[Transaction], Monoid[Diff].empty, rest, false, rest, 0)) {
.iterator
.scanLeft((Seq.empty[Transaction], Monoid[Diff].empty, rest, false, rest, 0)) {
case ((valid, diff, currRest, _, lastOverfilled, iterations), tx) =>
val preUpdatedRest = currRest.put(blockchain, tx, Diff.empty) // TODO: Doesn't handle scriptRuns/scriptComplexity
if (preUpdatedRest.isOverfilled) {
Expand Down Expand Up @@ -228,7 +233,7 @@ class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: O
}
}
}
.takeWhile(!_._4) // !currRest.isEmpty
.takeWhile(r => !r._4 && (r._1.isEmpty || !isTimeLimitReached)) // !currRest.isEmpty && (validTxs.isEmpty || !isTimeLimitReached)
.reduce((_, right) => right)
}

Expand Down Expand Up @@ -287,7 +292,7 @@ class UtxPoolImpl(time: Time, blockchain: Blockchain, spendableBalanceChanged: O
}

private[UtxPoolImpl] def doCleanup(): Unit = {
UtxPoolImpl.this.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited)
UtxPoolImpl.this.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, ScalaDuration.Inf)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class BlockWithMaxBaseTargetTest extends FreeSpec with Matchers with WithDB with
override def all = ???
override def size = ???
override def transactionById(transactionId: ByteStr) = ???
override def packUnconfirmed(rest: MultiDimensionalMiningConstraint) = ???
override def packUnconfirmed(rest: MultiDimensionalMiningConstraint, maxPackTime: Duration): (Seq[Transaction], MultiDimensionalMiningConstraint) = ???
override def close(): Unit = {}
}
val schedulerService: SchedulerService = Scheduler.singleThread("appender")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import com.typesafe.config.ConfigFactory
import net.ceedubs.ficus.Ficus._
import net.ceedubs.ficus.readers.ArbitraryTypeReader._
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.duration._

class MinerSettingsSpecification extends FlatSpec with Matchers {
Expand All @@ -20,6 +21,7 @@ class MinerSettingsSpecification extends FlatSpec with Matchers {
| max-transactions-in-key-block: 300
| max-transactions-in-micro-block: 400
| min-micro-block-age: 3s
| max-pack-time: 5s
| }
|}
""".stripMargin).resolve()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ class UtxPoolSpecification

val maxNumber = Math.max(utx.all.size / 2, 3)
val rest = limitByNumber(maxNumber)
val (packed, restUpdated) = utx.packUnconfirmed(rest)
val (packed, restUpdated) = utx.packUnconfirmed(rest, 5.seconds)

packed.lengthCompare(maxNumber) should be <= 0
if (maxNumber <= utx.all.size) restUpdated.isEmpty shouldBe true
Expand All @@ -450,7 +450,7 @@ class UtxPoolSpecification

time.advance(maxAge + 1000.millis)

val (packed, _) = utx.packUnconfirmed(limitByNumber(100))
val (packed, _) = utx.packUnconfirmed(limitByNumber(100), 5.seconds)
packed shouldBe 'empty
utx.all shouldBe 'empty
}
Expand All @@ -462,7 +462,7 @@ class UtxPoolSpecification

time.advance(offset)

val (packed, _) = utx.packUnconfirmed(limitByNumber(100))
val (packed, _) = utx.packUnconfirmed(limitByNumber(100), 5.seconds)
packed.size shouldBe 2
utx.all.size shouldBe 2
}
Expand Down Expand Up @@ -517,7 +517,7 @@ class UtxPoolSpecification
val constraint = MultiDimensionalMiningConstraint(
NonEmptyList.of(OneDimensionalMiningConstraint(1, TxEstimators.scriptRunNumber),
OneDimensionalMiningConstraint(Block.MaxTransactionsPerBlockVer3, TxEstimators.one)))
val (packed, _) = utx.packUnconfirmed(constraint)
val (packed, _) = utx.packUnconfirmed(constraint, 5.seconds)
packed.size shouldBe (unscripted.size + 1)
packed.count(scripted.contains) shouldBe 1
}
Expand Down Expand Up @@ -548,7 +548,7 @@ class UtxPoolSpecification
val poolSizeBefore = utxPool.size

time.advance(maxAge * 2)
utxPool.packUnconfirmed(limitByNumber(100))
utxPool.packUnconfirmed(limitByNumber(100), 5.seconds)

poolSizeBefore should be > utxPool.size
val portfolioAfter = utxPool.pessimisticPortfolio(sender)
Expand Down
2 changes: 1 addition & 1 deletion scalastyle-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
<check level="warning" class="org.scalastyle.scalariform.PublicMethodsHaveTypeChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NewLineAtEofChecker" enabled="true"></check>
<check level="warning" class="org.scalastyle.file.NoNewLineAtEofChecker" enabled="false"></check>
<check level="warning" class="org.scalastyle.scalariform.TodoCommentChecker" enabled="true">
<check level="warning" class="org.scalastyle.scalariform.TodoCommentChecker" enabled="false">
<parameters>
<parameter name="words">TODO|FIXME|todo|fixme|bug|BUG</parameter>
</parameters>
Expand Down

0 comments on commit aff12f8

Please sign in to comment.