From 663db5660014b41470ee5c3b941a85321dd007d2 Mon Sep 17 00:00:00 2001 From: Sergey Nazarov Date: Mon, 2 Dec 2024 17:12:31 +0300 Subject: [PATCH] Added Units registry (#3977) --- docker/Dockerfile | 2 +- docker/entrypoint.sh | 1 + .../scala/com/wavesplatform/Application.scala | 1 - .../api/http/DebugApiRoute.scala | 12 +- .../com/wavesplatform/mining/Miner.scala | 3 +- .../mining/microblocks/MicroBlockMiner.scala | 6 +- .../microblocks/MicroBlockMinerImpl.scala | 5 +- .../settings/BlockchainSettings.scala | 11 +- .../com/wavesplatform/state/Blockchain.scala | 14 +- .../com/wavesplatform/utx/UtxPoolImpl.scala | 47 +++--- .../wavesplatform/utx/UtxPriorityPool.scala | 149 +----------------- .../http/DebugApiRouteSpec.scala | 1 - .../mining/LightNodeBlockFieldsTest.scala | 3 +- .../mining/MicroBlockMinerSpec.scala | 6 +- .../state/BlockChallengeTest.scala | 4 +- .../utx/UtxPoolSpecification.scala | 12 -- .../utx/UtxPriorityPoolSpecification.scala | 104 +----------- 17 files changed, 66 insertions(+), 315 deletions(-) diff --git a/docker/Dockerfile b/docker/Dockerfile index 1ffdf2a56a..7eb6ff55c5 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,4 +1,4 @@ -ARG baseImage=eclipse-temurin:11-jre-noble +ARG baseImage=eclipse-temurin:21-jre-noble FROM $baseImage ENV WAVES_LOG_LEVEL=INFO diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 9d01196ea0..66d9965512 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -3,6 +3,7 @@ JAVA_OPTS="-XX:+ExitOnOutOfMemoryError -Xmx${WAVES_HEAP_SIZE} --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED -Dlogback.stdout.level=${WAVES_LOG_LEVEL} -Dlogback.file.directory=${WVLOG} -Dwaves.config.directory=/etc/waves diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index b3ac210cfd..cd399659f4 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -435,7 +435,6 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con scoreStatsReporter, configRoot, rocksDB, - () => utxStorage.getPriorityPool.map(_.compositeBlockchain), routeTimeout, heavyRequestScheduler ), diff --git a/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala b/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala index ffc2848a9f..4809c40927 100644 --- a/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala +++ b/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala @@ -55,7 +55,6 @@ case class DebugApiRoute( scoreReporter: Coeval[RxScoreObserver.Stats], configRoot: ConfigObject, db: RocksDBWriter, - priorityPoolBlockchain: () => Option[Blockchain], routeTimeout: RouteTimeout, heavyRequestScheduler: Scheduler ) extends ApiRoute @@ -197,14 +196,13 @@ case class DebugApiRoute( def validate: Route = path("validate")(jsonPost[JsObject] { jsv => - val resBlockchain = priorityPoolBlockchain().getOrElse(blockchain) val startTime = System.nanoTime() val parsedTransaction = TransactionFactory.fromSignedRequest(jsv) val tracedSnapshot = for { tx <- TracedResult(parsedTransaction) - diff <- TransactionDiffer.forceValidate(resBlockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(resBlockchain, tx) + diff <- TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)(blockchain, tx) } yield (tx, diff) val error = tracedSnapshot.resultE match { @@ -218,7 +216,7 @@ case class DebugApiRoute( .fold( _ => this.serializer, { case (_, snapshot) => - val snapshotBlockchain = SnapshotBlockchain(resBlockchain, snapshot) + val snapshotBlockchain = SnapshotBlockchain(blockchain, snapshot) this.serializer.copy(blockchain = snapshotBlockchain) } ) @@ -230,8 +228,8 @@ case class DebugApiRoute( val meta = tx match { case ist: InvokeScriptTransaction => val result = diff.scriptResults.get(ist.id()) - TransactionMeta.Invoke(Height(resBlockchain.height), ist, TxMeta.Status.Succeeded, diff.scriptsComplexity, result) - case tx => TransactionMeta.Default(Height(resBlockchain.height), tx, TxMeta.Status.Succeeded, diff.scriptsComplexity) + TransactionMeta.Invoke(Height(blockchain.height), ist, TxMeta.Status.Succeeded, diff.scriptsComplexity, result) + case tx => TransactionMeta.Default(Height(blockchain.height), tx, TxMeta.Status.Succeeded, diff.scriptsComplexity) } serializer.transactionWithMetaJson(meta) } @@ -244,7 +242,7 @@ case class DebugApiRoute( case ist: InvokeScriptTrace => ist.maybeLoggedJson(logged = true)(serializer.invokeScriptResultWrites) case trace => trace.loggedJson }, - "height" -> resBlockchain.height + "height" -> blockchain.height ) error.fold(response ++ extendedJson)(err => diff --git a/node/src/main/scala/com/wavesplatform/mining/Miner.scala b/node/src/main/scala/com/wavesplatform/mining/Miner.scala index 77163abde0..e107527422 100644 --- a/node/src/main/scala/com/wavesplatform/mining/Miner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/Miner.scala @@ -84,8 +84,7 @@ class MinerImpl( settings.minerSettings, minerScheduler, appenderScheduler, - transactionAdded, - utx.getPriorityPool.map(p => p.nextMicroBlockSize(_)).getOrElse(identity) + transactionAdded ) def getNextBlockGenerationOffset(account: KeyPair): Either[String, FiniteDuration] = diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala index d53d1cf88b..8ba33bb11c 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMiner.scala @@ -30,8 +30,7 @@ object MicroBlockMiner { settings: MinerSettings, minerScheduler: SchedulerService, appenderScheduler: SchedulerService, - transactionAdded: Observable[Unit], - nextMicroBlockSize: Int => Int = identity + transactionAdded: Observable[Unit] ): MicroBlockMiner = new MicroBlockMinerImpl( setDebugState, @@ -41,7 +40,6 @@ object MicroBlockMiner { settings, minerScheduler, appenderScheduler, - transactionAdded, - nextMicroBlockSize + transactionAdded ) } diff --git a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala index 22b87d9b02..29fb35820a 100644 --- a/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala +++ b/node/src/main/scala/com/wavesplatform/mining/microblocks/MicroBlockMinerImpl.scala @@ -34,8 +34,7 @@ class MicroBlockMinerImpl( settings: MinerSettings, minerScheduler: SchedulerService, appenderScheduler: SchedulerService, - transactionAdded: Observable[Unit], - nextMicroBlockSize: Int => Int + transactionAdded: Observable[Unit] ) extends MicroBlockMiner with ScorexLogging { @@ -73,7 +72,7 @@ class MicroBlockMinerImpl( val mdConstraint = MultiDimensionalMiningConstraint( restTotalConstraint, OneDimensionalMiningConstraint( - nextMicroBlockSize(settings.maxTransactionsInMicroBlock), + settings.maxTransactionsInMicroBlock, TxEstimators.one, "MaxTxsInMicroBlock" ) diff --git a/node/src/main/scala/com/wavesplatform/settings/BlockchainSettings.scala b/node/src/main/scala/com/wavesplatform/settings/BlockchainSettings.scala index 81e5c2dd65..a6043d7e1e 100644 --- a/node/src/main/scala/com/wavesplatform/settings/BlockchainSettings.scala +++ b/node/src/main/scala/com/wavesplatform/settings/BlockchainSettings.scala @@ -78,7 +78,8 @@ case class FunctionalitySettings( xtnBuybackRewardPeriod: Int = Int.MaxValue, lightNodeBlockFieldsAbsenceInterval: Int = 1000, blockRewardBoostPeriod: Int = 1000, - paymentsCheckHeight: Int = 0 + paymentsCheckHeight: Int = 0, + unitsRegistryAddress: Option[String] = None ) { val allowLeasedBalanceTransferUntilHeight: Int = blockVersion3AfterHeight val allowTemporaryNegativeUntil: Long = lastTimeBasedForkParameter @@ -92,6 +93,8 @@ case class FunctionalitySettings( daoAddress.traverse(Address.fromString(_)).leftMap(_ => "Incorrect dao-address") lazy val xtnBuybackAddressParsed: Either[String, Option[Address]] = xtnBuybackAddress.traverse(Address.fromString(_)).leftMap(_ => "Incorrect xtn-buyback-address") + lazy val unitsRegistryAddressParsed: Either[String, Option[Address]] = + unitsRegistryAddress.traverse(Address.fromString(_)).leftMap(_ => "Incorrect units-registry-address") require(featureCheckBlocksPeriod > 0, "featureCheckBlocksPeriod must be greater than 0") require( @@ -133,7 +136,8 @@ object FunctionalitySettings { xtnBuybackAddress = Some("3PFjHWuH6WXNJbwnfLHqNFBpwBS5dkYjTfv"), xtnBuybackRewardPeriod = 100000, blockRewardBoostPeriod = 300_000, - paymentsCheckHeight = 4303300 + paymentsCheckHeight = 4303300, + unitsRegistryAddress = Some("3P8LfPXcveST7WKkV3UACQNdr6J3shPYong") ) val TESTNET: FunctionalitySettings = apply( @@ -149,7 +153,8 @@ object FunctionalitySettings { daoAddress = Some("3Myb6G8DkdBb8YcZzhrky65HrmiNuac3kvS"), xtnBuybackAddress = Some("3N13KQpdY3UU7JkWUBD9kN7t7xuUgeyYMTT"), xtnBuybackRewardPeriod = 2000, - blockRewardBoostPeriod = 2_000 + blockRewardBoostPeriod = 2_000, + unitsRegistryAddress = Some("3N9fwNGJcUcAbhh7YPr6mrpuGJD4tApZFsT") ) val STAGENET: FunctionalitySettings = apply( diff --git a/node/src/main/scala/com/wavesplatform/state/Blockchain.scala b/node/src/main/scala/com/wavesplatform/state/Blockchain.scala index ab6283ea2a..db5cd5af83 100644 --- a/node/src/main/scala/com/wavesplatform/state/Blockchain.scala +++ b/node/src/main/scala/com/wavesplatform/state/Blockchain.scala @@ -61,9 +61,9 @@ trait Blockchain { def balanceAtHeight(address: Address, height: Int, assetId: Asset = Waves): Option[(Int, Long)] - /** - * Retrieves Waves balance snapshot in the [from, to] range (inclusive) - * @return Balance snapshots from most recent to oldest. + /** Retrieves Waves balance snapshot in the [from, to] range (inclusive) + * @return + * Balance snapshots from most recent to oldest. */ def balanceSnapshots(address: Address, from: Int, to: Option[BlockId]): Seq[BalanceSnapshot] @@ -225,13 +225,17 @@ object Blockchain { blockchain.effectiveBalanceBanHeights(address).contains(height) def supportsLightNodeBlockFields(height: Int = blockchain.height): Boolean = - blockchain.featureActivationHeight(LightNode.id).exists(height >= _ + blockchain.settings.functionalitySettings.lightNodeBlockFieldsAbsenceInterval) + blockchain + .featureActivationHeight(LightNode.id) + .exists(height >= _ + blockchain.settings.functionalitySettings.lightNodeBlockFieldsAbsenceInterval) def blockRewardBoost(height: Int): Int = blockchain .featureActivationHeight(BlockchainFeatures.BoostBlockReward.id) .filter { boostHeight => boostHeight <= height && height < boostHeight + blockchain.settings.functionalitySettings.blockRewardBoostPeriod - }.fold(1)(_ => BlockRewardCalculator.RewardBoost) + } + .fold(1)(_ => BlockRewardCalculator.RewardBoost) + } } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala index 23adf485ff..b38b3d468c 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala @@ -59,13 +59,13 @@ case class UtxPoolImpl( private[this] val inUTXPoolOrdering = TransactionsOrdering.InUTXPool(utxSettings.fastLaneAddresses) // State - val priorityPool = new UtxPriorityPool(blockchain) + val priorityPool = new UtxPriorityPool private[this] val transactions = new ConcurrentHashMap[ByteStr, Transaction]() override def getPriorityPool: Option[UtxPriorityPool] = Some(priorityPool) override def putIfNew(tx: Transaction, forceValidate: Boolean): TracedResult[ValidationError, Boolean] = { - if (transactions.containsKey(tx.id()) || priorityPool.contains(tx.id())) TracedResult.wrapValue(false) + if (transactions.containsKey(tx.id())) TracedResult.wrapValue(false) else putNewTx(tx, forceValidate) } @@ -171,10 +171,8 @@ case class UtxPoolImpl( removeIds(ids) } - def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit = { - val txs = priorityPool.setPriorityDiffs(discSnapshots) - txs.foreach(addTransaction(_, verify = false, canLock = false)) - } + def setPrioritySnapshots(discSnapshots: Seq[StateSnapshot]): Unit = + priorityPool.setPriorityDiffs(discSnapshots).foreach(addTransaction(_, verify = false)) def resetPriorityPool(): Unit = priorityPool.setPriorityDiffs(Seq.empty) @@ -186,23 +184,19 @@ case class UtxPoolImpl( } } - private[this] def removeIds(removed: Set[ByteStr]): Unit = { - val priorityRemoved = priorityPool.removeIds(removed) - val factRemoved = priorityRemoved ++ removed.flatMap(id => removeFromOrdPool(id)) - factRemoved.foreach(TxStateActions.removeMined(_)) - } + private[this] def removeIds(removed: Set[ByteStr]): Unit = + removed.flatMap(id => removeFromOrdPool(id)).foreach(TxStateActions.removeMined(_)) private[utx] def addTransaction( tx: Transaction, verify: Boolean, - forceValidate: Boolean = false, - canLock: Boolean = true + forceValidate: Boolean = false ): TracedResult[ValidationError, Boolean] = { val diffEi = { def calculateSnapshot(): TracedResult[ValidationError, StateSnapshot] = { if (forceValidate) TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)( - priorityPool.compositeBlockchain, + blockchain, tx ) else @@ -213,13 +207,12 @@ case class UtxPoolImpl( verify, enableExecutionLog = true )( - priorityPool.compositeBlockchain, + blockchain, tx ) } - if (canLock) priorityPool.optimisticRead(calculateSnapshot())(_.resultE.isLeft) - else calculateSnapshot() + calculateSnapshot() } if (!verify || diffEi.resultE.isRight) { @@ -235,13 +228,12 @@ case class UtxPoolImpl( } override def all: Seq[Transaction] = - (priorityPool.priorityTransactions ++ nonPriorityTransactions).distinct + (priorityPool.priorityTransactionIds.flatMap(id => Option(transactions.get(id))) ++ nonPriorityTransactions).distinct override def size: Int = transactions.size override def transactionById(transactionId: ByteStr): Option[Transaction] = Option(transactions.get(transactionId)) - .orElse(priorityPool.transactionById(transactionId)) private def scriptedAddresses(tx: Transaction): Set[Address] = tx match { case t if inUTXPoolOrdering.isWhitelisted(t) => Set.empty @@ -256,23 +248,21 @@ case class UtxPoolImpl( private[this] case class TxEntry(tx: Transaction, priority: Boolean) private[this] def createTxEntrySeq(): Seq[TxEntry] = - priorityPool.priorityTransactions.map(TxEntry(_, priority = true)) ++ nonPriorityTransactions.map( - TxEntry(_, priority = false) - ) + priorityPool.priorityTransactionIds.flatMap(id => Option(transactions.get(id)).map(TxEntry(_, priority = true))) ++ + nonPriorityTransactions.map(TxEntry(_, priority = false)) override def packUnconfirmed( initialConstraint: MultiDimensionalMiningConstraint, prevStateHash: Option[ByteStr], strategy: PackStrategy, cancelled: () => Boolean - ): (Option[Seq[Transaction]], MultiDimensionalMiningConstraint, Option[ByteStr]) = { + ): (Option[Seq[Transaction]], MultiDimensionalMiningConstraint, Option[ByteStr]) = pack(TransactionDiffer(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true))( initialConstraint, strategy, prevStateHash, cancelled ) - } def cleanUnconfirmed(): Unit = { log.trace(s"Starting UTX cleanup at height ${blockchain.height}") @@ -286,7 +276,7 @@ case class UtxPoolImpl( } else { val differ = if (!isMiningEnabled && utxSettings.forceValidateInCleanup) { TransactionDiffer.forceValidate(blockchain.lastBlockTimestamp, time.correctedTime(), enableExecutionLog = true)( - priorityPool.compositeBlockchain, + blockchain, _ ) } else { @@ -296,7 +286,7 @@ case class UtxPoolImpl( utxSettings.alwaysUnlimitedExecution, enableExecutionLog = true )( - priorityPool.compositeBlockchain, + blockchain, _ ) } @@ -480,11 +470,10 @@ case class UtxPoolImpl( log.trace( s"Validated ${packResult.validatedTransactions.size} transactions, " + - s"of which ${packResult.transactions.fold(0)(_.size)} were packed, ${transactions.size() + priorityPool.priorityTransactions.size} transactions remaining" + s"of which ${packResult.transactions.fold(0)(_.size)} were packed, ${transactions.size()} transactions remaining" ) if (packResult.removedTransactions.nonEmpty) log.trace(s"Removing invalid transactions: ${packResult.removedTransactions.mkString(", ")}") - priorityPool.invalidateTxs(packResult.removedTransactions) (packResult.transactions.map(_.reverse), packResult.constraint, packResult.stateHash) } @@ -563,7 +552,7 @@ case class UtxPoolImpl( private def cleanupLoop(): Unit = cleanupScheduler.execute { () => while (scheduled.compareAndSet(true, false)) { - if (!transactions.isEmpty || priorityPool.priorityTransactions.nonEmpty) { + if (!transactions.isEmpty) { cleanUnconfirmed() } } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala index ec1000cee0..242b0f0899 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala @@ -1,152 +1,17 @@ package com.wavesplatform.utx -import cats.implicits.toFoldableOps -import com.wavesplatform.ResponsivenessLogs import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.state.SnapshotBlockchain -import com.wavesplatform.state.{Blockchain, StateSnapshot} +import com.wavesplatform.state.StateSnapshot import com.wavesplatform.transaction.Transaction -import com.wavesplatform.utils.{OptimisticLockable, ScorexLogging} -import kamon.Kamon -import kamon.metric.MeasurementUnit -import java.time.Duration -import java.time.temporal.ChronoUnit -import scala.annotation.tailrec +final class UtxPriorityPool { -final class UtxPriorityPool(realBlockchain: Blockchain) extends ScorexLogging with OptimisticLockable { - import UtxPriorityPool.* + @volatile private var priorityTxIds = Seq.empty[ByteStr] - private[this] case class PriorityData(diff: StateSnapshot, isValid: Boolean = true) + def priorityTransactionIds: Seq[ByteStr] = priorityTxIds - @volatile private[this] var priorityDiffs = Seq.empty[PriorityData] - @volatile private[this] var priorityDiffsCombined = StateSnapshot.empty - - def validPriorityDiffs: Seq[StateSnapshot] = priorityDiffs.takeWhile(_.isValid).map(_.diff) - def priorityTransactions: Seq[Transaction] = priorityDiffs.flatMap(_.diff.transactionsValues) - def priorityTransactionIds: Seq[ByteStr] = priorityTransactions.map(_.id()) - - def compositeBlockchain: Blockchain = - if (priorityDiffs.isEmpty) realBlockchain - else SnapshotBlockchain(realBlockchain, priorityDiffsCombined) - - def optimisticRead[T](f: => T)(shouldRecheck: T => Boolean): T = - this.readLockCond(f)(shouldRecheck) - - private[utx] def setPriorityDiffs(discDiffs: Seq[StateSnapshot]): Set[Transaction] = - if (discDiffs.isEmpty) { - clear() - Set.empty - } else { - val transactions = updateDiffs(_ => discDiffs.map(PriorityData(_))) - log.trace( - s"Priority pool updated with diffs: [${discDiffs.map(_.hashString).mkString(", ")}], transactions order: [${priorityTransactionIds.mkString(", ")}]" - ) - transactions - } - - private[utx] def invalidateTxs(removed: Set[ByteStr]): Unit = - updateDiffs(_.map { pd => - if (pd.diff.transactionIds.exists(removed)) { - val keep = pd.diff.transactions.filterNot(nti => removed(nti._2.transaction.id())) - pd.copy(StateSnapshot.empty.copy(keep), isValid = false) - } else pd - }) - - private[utx] def removeIds(removed: Set[ByteStr]): Set[Transaction] = { - case class RemoveResult(diffsRest: Seq[PriorityData], removed: Set[Transaction]) - - @tailrec - def removeRec(diffs: Seq[PriorityData], cleanRemoved: Set[Transaction] = Set.empty): RemoveResult = diffs match { - case Nil => - RemoveResult(Nil, cleanRemoved) - - case pd +: rest if pd.diff.transactionIds.subsetOf(removed) => - removeRec(rest, cleanRemoved ++ pd.diff.transactionsValues) - - case _ if cleanRemoved.map(_.id()) == removed => - RemoveResult(diffs, cleanRemoved) - - case _ => // Partial remove, invalidate priority pool - RemoveResult(diffs.map(_.copy(isValid = false)), cleanRemoved) - } - - val result = removeRec(this.priorityDiffs) - if (result.removed.nonEmpty) - log.trace( - s"Removing diffs from priority pool: removed txs: [${result.removed.map(_.id()).mkString(", ")}], remaining diffs: [${result.diffsRest.map(_.diff.hashString).mkString(", ")}]" - ) - - updateDiffs(_ => result.diffsRest) - if (priorityTransactionIds.nonEmpty) log.trace(s"Priority pool transactions order: ${priorityTransactionIds.mkString(", ")}") - - result.removed - } - - def transactionById(txId: ByteStr): Option[Transaction] = - priorityDiffsCombined.transactions.get(txId).map(_.transaction) - - def contains(txId: ByteStr): Boolean = transactionById(txId).nonEmpty - - def nextMicroBlockSize(limit: Int): Int = { - @tailrec - def nextMicroBlockSizeRec(last: Int, diffs: Seq[StateSnapshot]): Int = (diffs: @unchecked) match { - case Nil => last.max(limit) - case diff +: _ if last + diff.transactions.size > limit => - if (last == 0) diff.transactions.size // First micro - else last - case diff +: rest => nextMicroBlockSizeRec(last + diff.transactions.size, rest) - } - nextMicroBlockSizeRec(0, priorityDiffs.map(_.diff)) - } - - private[utx] def clear(): Seq[Transaction] = { - val txs = this.priorityTransactions - updateDiffs(_ => Nil) - txs - } - - private[this] def updateDiffs(f: Seq[PriorityData] => Seq[PriorityData]): Set[Transaction] = { - val oldTxs = priorityTransactions.toSet - - priorityDiffs = f(priorityDiffs).filterNot(_.diff.transactions.isEmpty) - priorityDiffsCombined = validPriorityDiffs.combineAll - - val newTxs = priorityTransactions.toSet - - val removed = oldTxs diff newTxs - removed.foreach(PoolMetrics.removeTransactionPriority) - (newTxs diff oldTxs).foreach { tx => - PoolMetrics.addTransactionPriority(tx) - ResponsivenessLogs.writeEvent(realBlockchain.height, tx, ResponsivenessLogs.TxEvent.Received) - } - removed - } - - // noinspection TypeAnnotation - private[this] object PoolMetrics { - private[this] val SampleInterval: Duration = Duration.of(500, ChronoUnit.MILLIS) - - private[this] val prioritySizeStats = Kamon.rangeSampler("utx.priority-pool-size", MeasurementUnit.none, SampleInterval).withoutTags() - private[this] val priorityBytesStats = - Kamon.rangeSampler("utx.priority-pool-bytes", MeasurementUnit.information.bytes, SampleInterval).withoutTags() - - def addTransactionPriority(tx: Transaction): Unit = { - prioritySizeStats.increment() - priorityBytesStats.increment(tx.bytes().length) - } - - def removeTransactionPriority(tx: Transaction): Unit = { - prioritySizeStats.decrement() - priorityBytesStats.decrement(tx.bytes().length) - } - } -} - -private object UtxPriorityPool { - implicit class DiffExt(private val snapshot: StateSnapshot) extends AnyVal { - def contains(txId: ByteStr): Boolean = snapshot.transactions.contains(txId) - def transactionsValues: Seq[Transaction] = snapshot.transactions.map(_._2.transaction).toSeq - def transactionIds: collection.Set[ByteStr] = transactionsValues.map(_.id()).toSet + private[utx] def setPriorityDiffs(discDiffs: Seq[StateSnapshot]): Set[Transaction] = { + priorityTxIds = discDiffs.flatMap(_.transactions.keys) + discDiffs.flatMap(_.transactions.values.map(_.transaction)).toSet } } diff --git a/node/tests/src/test/scala/com/wavesplatform/http/DebugApiRouteSpec.scala b/node/tests/src/test/scala/com/wavesplatform/http/DebugApiRouteSpec.scala index 6f5453d75f..52527038bd 100644 --- a/node/tests/src/test/scala/com/wavesplatform/http/DebugApiRouteSpec.scala +++ b/node/tests/src/test/scala/com/wavesplatform/http/DebugApiRouteSpec.scala @@ -90,7 +90,6 @@ class DebugApiRouteSpec null, configObject, domain.rocksDBWriter, - () => Some(domain.blockchain), new RouteTimeout(60.seconds)(sharedScheduler), sharedScheduler ) diff --git a/node/tests/src/test/scala/com/wavesplatform/mining/LightNodeBlockFieldsTest.scala b/node/tests/src/test/scala/com/wavesplatform/mining/LightNodeBlockFieldsTest.scala index 34ef6e178c..deaa10a9d0 100644 --- a/node/tests/src/test/scala/com/wavesplatform/mining/LightNodeBlockFieldsTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/mining/LightNodeBlockFieldsTest.scala @@ -45,8 +45,7 @@ class LightNodeBlockFieldsTest extends PropSpec with WithDomain { d.settings.minerSettings, miner.minerScheduler, miner.appenderScheduler, - Observable.empty, - identity + Observable.empty ) val challenger = new BlockChallengerImpl( d.blockchain, diff --git a/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala b/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala index ccfc9c26ae..ba83a71d86 100644 --- a/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala +++ b/node/tests/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala @@ -42,8 +42,7 @@ class MicroBlockMinerSpec extends FlatSpec with PathMockFactory with WithDomain settings.minerSettings, scheduler, scheduler, - Observable.empty, - identity + Observable.empty ) def generateBlocks( @@ -165,8 +164,7 @@ class MicroBlockMinerSpec extends FlatSpec with PathMockFactory with WithDomain RideV6.minerSettings, miner, appender, - utxEvents.collect { case _: UtxEvent.TxAdded => () }, - identity + utxEvents.collect { case _: UtxEvent.TxAdded => () } ) val block = d.appendBlock(ProtoBlockVersion) diff --git a/node/tests/src/test/scala/com/wavesplatform/state/BlockChallengeTest.scala b/node/tests/src/test/scala/com/wavesplatform/state/BlockChallengeTest.scala index 734f906a74..6347edfb12 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/BlockChallengeTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/BlockChallengeTest.scala @@ -1878,8 +1878,8 @@ class BlockChallengeTest testTime.setTime(betterBlock.header.timestamp) appender(betterBlock).runSyncUnsafe() should beRight d.lastBlock shouldBe betterBlock - d.utxPool.priorityPool.priorityTransactions.size shouldBe txs.size - d.utxPool.priorityPool.priorityTransactions.toSet shouldBe txs.toSet + d.utxPool.size shouldBe txs.size + d.utxPool.all.toSet shouldBe txs.toSet } } diff --git a/node/tests/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala b/node/tests/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala index aa056d23da..1ce057be22 100644 --- a/node/tests/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala +++ b/node/tests/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala @@ -1022,18 +1022,6 @@ class UtxPoolSpecification extends FreeSpec with MockFactory with BlocksTransact d.utxPool.nonPriorityTransactions.toSet shouldBe transfers.toSet } - "takes the priority diff into account" in withDomain() { d => - d.helpers.creditWavesToDefaultSigner(11.waves) - val transfer1 = TxHelpers.transfer(amount = 10.waves) - val transfer2 = TxHelpers.transfer(amount = 10.waves) // Double spend - - d.utxPool.priorityPool.setPriorityDiffs(Seq(d.createDiff(transfer1))) - d.utxPool.addTransaction(transfer2, verify = false) - - d.utxPool.cleanUnconfirmed() - d.utxPool.nonPriorityTransactions shouldBe Nil - } - "doesnt validate transactions which are removed" in { val gen = for { acc <- accountGen diff --git a/node/tests/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala b/node/tests/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala index a4d174b353..4d2294fa48 100644 --- a/node/tests/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala +++ b/node/tests/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala @@ -1,13 +1,9 @@ package com.wavesplatform.utx -import com.wavesplatform.account.KeyPair import com.wavesplatform.db.WithState -import com.wavesplatform.lang.directives.values.V3 -import com.wavesplatform.lang.v1.compiler.TestCompiler import com.wavesplatform.mining.MultiDimensionalMiningConstraint import com.wavesplatform.settings.WavesSettings import com.wavesplatform.test.* -import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.TxHelpers import com.wavesplatform.utx.UtxPool.PackStrategy @@ -26,18 +22,6 @@ class UtxPriorityPoolSpecification extends FreeSpec with SharedDomain { private def pack() = domain.utxPool.packUnconfirmed(MultiDimensionalMiningConstraint.Unlimited, None, PackStrategy.Unlimited)._1 - private def mkHeightSensitiveScript(sender: KeyPair) = - TxHelpers.setScript( - sender, - TestCompiler(V3).compileExpression(s""" - |match tx { - | case _: TransferTransaction => height % 2 == ${domain.blockchain.height % 2} - | case _ => true - |} - |""".stripMargin), - fee = 0.01.waves - ) - "priority pool" - { "preserves correct order of transactions" in { val id = domain.appendKeyBlock().id() @@ -49,100 +33,26 @@ class UtxPriorityPoolSpecification extends FreeSpec with SharedDomain { domain.appendKeyBlock(ref = Some(id)) val expectedTransactions = Seq(t1, t2) - domain.utxPool.priorityPool.priorityTransactions shouldBe expectedTransactions + domain.utxPool.all shouldBe expectedTransactions pack() shouldBe Some(expectedTransactions) } - "takes into account priority txs when packing" in { - val id = domain.appendKeyBlock().id() - val bob = nextKeyPair - val transfer1 = TxHelpers.transfer(alice, bob.toAddress, 10.001.waves, fee = 0.001.waves) - - domain.appendMicroBlock(transfer1) - domain.appendKeyBlock(ref = Some(id)) - - domain.utxPool.priorityPool.priorityTransactions shouldBe Seq(transfer1) - - val transfer2 = TxHelpers.transfer(bob, nextKeyPair.toAddress, 10.waves, fee = 0.001.waves) - - domain.utxPool.putIfNew(transfer2).resultE should beRight - domain.utxPool.nonPriorityTransactions shouldBe Seq(transfer2) - pack() shouldBe Some(Seq(transfer1, transfer2)) - } - - "counts microblock size from priority diffs" in { - val ref = domain.appendKeyBlock().id() - domain.appendMicroBlock(Seq.tabulate(5) { i => - TxHelpers.transfer(alice, TxHelpers.signer(200 + i).toAddress) - }*) - domain.appendMicroBlock(Seq.tabulate(5) { i => - TxHelpers.transfer(alice, TxHelpers.signer(300 + i).toAddress) - }*) - - domain.appendKeyBlock(ref = Some(ref)) - // priority pool contains two microblocks, 5 txs each - domain.utxPool.priorityPool.nextMicroBlockSize(3) shouldBe 5 - domain.utxPool.priorityPool.nextMicroBlockSize(5) shouldBe 5 - domain.utxPool.priorityPool.nextMicroBlockSize(8) shouldBe 5 - domain.utxPool.priorityPool.nextMicroBlockSize(10) shouldBe 10 - domain.utxPool.priorityPool.nextMicroBlockSize(12) shouldBe 12 - } - - "cleans up priority pool only when packing, not during cleanup" in { - - val bob, carol = nextKeyPair - - domain.appendKeyBlock() - val rollbackTarget = domain.appendMicroBlock( - TxHelpers.transfer(alice, bob.toAddress, 10.015.waves, fee = 0.001.waves), - mkHeightSensitiveScript(bob) - ) - val transferToCarol = TxHelpers.transfer(bob, carol.toAddress, 10.waves, fee = 0.005.waves) - domain.appendMicroBlock(transferToCarol) - - domain.appendKeyBlock(ref = Some(rollbackTarget)) - domain.utxPool.cleanUnconfirmed() - domain.utxPool.priorityPool.priorityTransactions shouldEqual Seq(transferToCarol) - pack() shouldBe None - domain.utxPool.priorityPool.priorityTransactions shouldBe empty - } - - "continues packing when priority snapshot contains no valid transactions" in { - val bob = nextKeyPair - domain.appendBlock( - TxHelpers.transfer(alice, bob.toAddress, 10.02.waves, fee = 0.001.waves), - mkHeightSensitiveScript(bob) - ) - val ref = domain.appendKeyBlock().id() - val transfer1 = TxHelpers.transfer(bob, nextKeyPair.toAddress, 10.waves, fee = 0.005.waves) - domain.appendMicroBlock(transfer1) - domain.appendKeyBlock(ref = Some(ref)) - domain.utxPool.priorityPool.priorityTransactions shouldEqual Seq(transfer1) - - val createAlias = TxHelpers.createAlias("0xbob", bob, 0.005.waves) - domain.utxPool.putIfNew(createAlias).resultE should beRight - domain.utxPool.all shouldEqual Seq(transfer1, createAlias) - - pack() shouldEqual Some(Seq(createAlias)) - } - "tx from last microblock is placed on next height ahead of new txs after appending key block" in { domain.utxPool.removeAll(domain.utxPool.nonPriorityTransactions) val blockId = domain.appendKeyBlock().id() - - val issue = TxHelpers.issue(alice) - val transfer = TxHelpers.transfer(alice, asset = IssuedAsset(issue.id())) + val issue = TxHelpers.issue(alice) domain.appendMicroBlock(issue) domain.blockchain.transactionInfo(issue.id()) shouldBe defined - domain.utxPool.priorityPool.priorityTransactions shouldBe Nil + domain.utxPool.all shouldBe Nil domain.appendKeyBlock(ref = Some(blockId)) domain.blockchain.transactionInfo(issue.id()) shouldBe None - domain.utxPool.priorityPool.priorityTransactions shouldBe Seq(issue) + domain.utxPool.all shouldBe Seq(issue) - domain.utxPool.putIfNew(transfer) - pack() shouldBe Some(List(issue, transfer)) + val secondIssue = TxHelpers.issue(alice, fee = 2.waves) + domain.utxPool.putIfNew(secondIssue) + pack() shouldBe Some(List(issue, secondIssue)) } } }