From 88d076d063bfc5d9b2f2f8a066a42ebfeea5249a Mon Sep 17 00:00:00 2001 From: Le Karasique Date: Mon, 18 Oct 2021 16:03:28 +0300 Subject: [PATCH] NODE-2354: Remove pessimistic portfolio (UTX pool) (#3558) --- .../wavesplatform/it/api/AsyncHttpApi.scala | 7 +- .../wavesplatform/it/api/SyncHttpApi.scala | 3 - .../it/sync/AmountAsStringSuite.scala | 4 - .../it/sync/debug/DebugPortfoliosSuite.scala | 57 ----- .../main/resources/swagger-ui/openapi.yaml | 57 ----- .../scala/com/wavesplatform/Application.scala | 2 +- .../scala/com/wavesplatform/Importer.scala | 17 +- .../api/http/DebugApiRoute.scala | 57 ++--- .../scala/com/wavesplatform/utx/UtxPool.scala | 8 +- .../com/wavesplatform/utx/UtxPoolImpl.scala | 71 +----- .../wavesplatform/utx/UtxPriorityPool.scala | 9 +- .../com/wavesplatform/history/Domain.scala | 3 +- .../wavesplatform/mining/BlockV5Test.scala | 16 +- .../mining/BlockWithMaxBaseTargetTest.scala | 8 +- .../mining/MicroBlockMinerSpec.scala | 8 +- .../mining/MiningFailuresSuite.scala | 4 +- .../mining/MiningWithRewardSuite.scala | 14 +- .../appender/ExtensionAppenderSpec.scala | 3 +- .../InvokeScriptComplexitySpec.scala | 5 +- .../wavesplatform/utx/UtxFailedTxsSpec.scala | 7 +- .../utx/UtxPoolSpecification.scala | 209 ++++-------------- .../utx/UtxPriorityPoolSpecification.scala | 42 +--- 22 files changed, 122 insertions(+), 489 deletions(-) delete mode 100644 node-it/src/test/scala/com/wavesplatform/it/sync/debug/DebugPortfoliosSuite.scala diff --git a/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala b/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala index 49a2023c74b..61b9d843af4 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala @@ -933,11 +933,6 @@ object AsyncHttpApi extends Assertions { implicit val leaseBalanceFormat: Reads[LeaseBalance] = Json.reads[LeaseBalance] implicit val portfolioFormat: Reads[Portfolio] = Json.reads[Portfolio] - def debugPortfoliosFor(address: String, considerUnspent: Boolean, amountsAsStrings: Boolean = false): Future[Portfolio] = { - get(s"/debug/portfolios/$address?considerUnspent=$considerUnspent", withApiKey = true, amountsAsStrings = amountsAsStrings) - .as[Portfolio](amountsAsStrings) - } - def debugMinerInfo(): Future[Seq[State]] = getWithApiKey(s"/debug/minerInfo").as[Seq[State]] def transactionSerializer(body: JsObject): Future[TransactionSerialize] = @@ -958,7 +953,7 @@ object AsyncHttpApi extends Assertions { def assertBalances(acc: String, balance: Long, effectiveBalance: Long)(implicit pos: Position): Future[Unit] = for { - newBalance <- balanceDetails(acc) + newBalance <- balanceDetails(acc) } yield { withClue(s"effective balance of $acc") { newBalance.effective shouldBe effectiveBalance diff --git a/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala b/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala index ff185dce9fe..96c5ded54ad 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/api/SyncHttpApi.scala @@ -254,9 +254,6 @@ object SyncHttpApi extends Assertions with matchers.should.Matchers { def assetDistribution(asset: String): AssetDistribution = sync(async(n).assetDistribution(asset)) - def debugPortfoliosFor(address: String, considerUnspent: Boolean, amountsAsStrings: Boolean = false): Portfolio = - sync(async(n).debugPortfoliosFor(address, considerUnspent, amountsAsStrings)) - def broadcastIssue( source: KeyPair, name: String, diff --git a/node-it/src/test/scala/com/wavesplatform/it/sync/AmountAsStringSuite.scala b/node-it/src/test/scala/com/wavesplatform/it/sync/AmountAsStringSuite.scala index 2eb6aba2546..e74187b7c70 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/sync/AmountAsStringSuite.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/sync/AmountAsStringSuite.scala @@ -276,10 +276,6 @@ class AmountAsStringSuite extends BaseTransactionSuite with OverflowBlock { test("amount as string in debug api") { val firstBalance = sender.balanceDetails(firstAddress).available - val portfolio = sender.debugPortfoliosFor(firstAddress, considerUnspent = false, amountsAsStrings = true) - portfolio.balance shouldBe firstBalance - portfolio.lease.in shouldBe 0 - portfolio.lease.out shouldBe 0 sender.debugBalanceHistory(firstAddress, amountsAsStrings = true).head.balance shouldBe firstBalance diff --git a/node-it/src/test/scala/com/wavesplatform/it/sync/debug/DebugPortfoliosSuite.scala b/node-it/src/test/scala/com/wavesplatform/it/sync/debug/DebugPortfoliosSuite.scala deleted file mode 100644 index f3840120aef..00000000000 --- a/node-it/src/test/scala/com/wavesplatform/it/sync/debug/DebugPortfoliosSuite.scala +++ /dev/null @@ -1,57 +0,0 @@ -package com.wavesplatform.it.sync.debug - -import com.typesafe.config.Config -import com.wavesplatform.it.api.SyncHttpApi._ -import com.wavesplatform.it.sync._ -import com.wavesplatform.it.transactions.NodesFromDocker -import com.wavesplatform.it.{BaseFunSuite, NodeConfigs} -import com.wavesplatform.test._ - -class DebugPortfoliosSuite extends BaseFunSuite with NodesFromDocker { - override protected def nodeConfigs: Seq[Config] = - NodeConfigs.newBuilder - .overrideBase(_.quorum(0)) - .withDefault(entitiesNumber = 1) - .buildNonConflicting() - - private lazy val firstAcc = sender.createKeyPair() - private lazy val secondAcc = sender.createKeyPair() - - private lazy val firstAddress: String = firstAcc.toAddress.toString - private lazy val secondAddress: String = secondAcc.toAddress.toString - - override protected def beforeAll(): Unit = { - super.beforeAll() - sender.transfer(sender.keyPair, firstAddress, 20.waves, minFee, waitForTx = true) - sender.transfer(sender.keyPair, secondAddress, 20.waves, minFee, waitForTx = true) - } - - test("getting a balance considering pessimistic transactions from UTX pool - changed after UTX") { - val portfolioBefore = sender.debugPortfoliosFor(firstAddress, considerUnspent = true) - val utxSizeBefore = sender.utxSize - - sender.transfer(firstAcc, secondAddress, 5.waves, 5.waves) - sender.transfer(secondAcc, firstAddress, 7.waves, 5.waves) - - sender.waitForUtxIncreased(utxSizeBefore) - - val portfolioAfter = sender.debugPortfoliosFor(firstAddress, considerUnspent = true) - - val expectedBalance = portfolioBefore.balance - 10.waves // withdraw + fee - assert(portfolioAfter.balance == expectedBalance) - - } - - test("getting a balance without pessimistic transactions from UTX pool - not changed after UTX") { - nodes.waitForHeightArise() - - val portfolioBefore = sender.debugPortfoliosFor(firstAddress, considerUnspent = false) - val utxSizeBefore = sender.utxSize - - sender.transfer(firstAcc, secondAddress, 5.waves, fee = 5.waves) - sender.waitForUtxIncreased(utxSizeBefore) - - val portfolioAfter = sender.debugPortfoliosFor(firstAddress, considerUnspent = false) - assert(portfolioAfter.balance == portfolioBefore.balance) - } -} diff --git a/node/src/main/resources/swagger-ui/openapi.yaml b/node/src/main/resources/swagger-ui/openapi.yaml index 224a9841f16..db6ecf2deda 100644 --- a/node/src/main/resources/swagger-ui/openapi.yaml +++ b/node/src/main/resources/swagger-ui/openapi.yaml @@ -2579,63 +2579,6 @@ paths: security: - APIKey: [] x-codegen-request-body-name: address - '/debug/portfolios/{address}': - get: - tags: - - debug - summary: Portfolio - description: >- - Get current portfolio considering pessimistic transactions in the UTX - pool - operationId: getPortfolios - parameters: - - $ref: '#/components/parameters/address' - - name: considerUnspent - in: query - description: Taking into account pessimistic transactions from UTX pool - schema: - type: boolean - default: true - responses: - '200': - description: Json portfolio - content: - application/json: - schema: - required: - - assets - - balance - - lease - type: object - properties: - balance: - type: integer - format: int64 - lease: - required: - - in - - out - type: object - properties: - in: - type: integer - format: int64 - out: - type: integer - format: int64 - assets: - type: object - additionalProperties: - type: integer - format: int64 - description: map of assetId <-> balance - example: - assetId1: 0 - assetId2: 100 - '403': - $ref: '#/components/responses/ApiKeyNotValid' - security: - - APIKey: [] '/debug/rollback-to/{id}': delete: tags: diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index 54b094c9da8..da3613b5e66 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -122,7 +122,7 @@ class Application(val actorSystem: ActorSystem, val settings: WavesSettings, con val establishedConnections = new ConcurrentHashMap[Channel, PeerInfo] val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) val utxStorage = - new UtxPoolImpl(time, blockchainUpdater, spendableBalanceChanged, settings.utxSettings, utxEvents.onNext) + new UtxPoolImpl(time, blockchainUpdater, settings.utxSettings, utxEvents.onNext) maybeUtx = Some(utxStorage) val timer = new HashedWheelTimer() diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala index bd25bbf8e9e..884a800ecc3 100644 --- a/node/src/main/scala/com/wavesplatform/Importer.scala +++ b/node/src/main/scala/com/wavesplatform/Importer.scala @@ -3,6 +3,10 @@ package com.wavesplatform import java.io._ import java.net.{MalformedURLException, URL} +import scala.concurrent.{Await, Future} +import scala.concurrent.duration._ +import scala.util.{Failure, Success, Try} + import akka.actor.ActorSystem import com.google.common.io.ByteStreams import com.google.common.primitives.Ints @@ -12,7 +16,7 @@ import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonB import com.wavesplatform.block.{Block, BlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector -import com.wavesplatform.database.{DBExt, KeyTags, openDB} +import com.wavesplatform.database.{openDB, DBExt, KeyTags} import com.wavesplatform.events.{BlockchainUpdateTriggers, UtxEvent} import com.wavesplatform.extensions.{Context, Extension} import com.wavesplatform.features.BlockchainFeatures @@ -21,26 +25,21 @@ import com.wavesplatform.lang.ValidationError import com.wavesplatform.mining.Miner import com.wavesplatform.protobuf.block.PBBlocks import com.wavesplatform.settings.WavesSettings -import com.wavesplatform.state.appender.BlockAppender import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, Diff, Height} +import com.wavesplatform.state.appender.BlockAppender +import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} import com.wavesplatform.transaction.TxValidationError.GenericError import com.wavesplatform.transaction.smart.script.trace.TracedResult -import com.wavesplatform.transaction.{Asset, DiscardedBlocks, Transaction} import com.wavesplatform.utils._ import com.wavesplatform.utx.{UtxPool, UtxPoolImpl} import com.wavesplatform.wallet.Wallet import kamon.Kamon import monix.eval.Task import monix.execution.Scheduler -import monix.reactive.subjects.PublishSubject import monix.reactive.{Observable, Observer} import org.iq80.leveldb.DB import scopt.OParser -import scala.concurrent.duration._ -import scala.concurrent.{Await, Future} -import scala.util.{Failure, Success, Try} - object Importer extends ScorexLogging { import monix.execution.Scheduler.Implicits.global @@ -280,7 +279,7 @@ object Importer extends ScorexLogging { val db = openDB(settings.dbSettings.directory) val (blockchainUpdater, levelDb) = StorageFactory(settings, db, time, Observer.empty, BlockchainUpdateTriggers.combined(triggers)) - val utxPool = new UtxPoolImpl(time, blockchainUpdater, PublishSubject(), settings.utxSettings) + val utxPool = new UtxPoolImpl(time, blockchainUpdater, settings.utxSettings) val pos = PoSSelector(blockchainUpdater, settings.synchronizationSettings.maxBaseTarget) val extAppender = BlockAppender(blockchainUpdater, time, utxPool, pos, scheduler, importOptions.verify) _ diff --git a/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala b/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala index 1ac15bf5aa4..6eeebc220e3 100644 --- a/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala +++ b/node/src/main/scala/com/wavesplatform/api/http/DebugApiRoute.scala @@ -3,12 +3,16 @@ package com.wavesplatform.api.http import java.net.{InetAddress, InetSocketAddress, URI} import java.util.concurrent.ConcurrentMap +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration._ +import scala.util.{Failure, Success} +import scala.util.control.NonFatal + import akka.http.scaladsl.common.{EntityStreamingSupport, JsonEntityStreamingSupport} import akka.http.scaladsl.model.StatusCodes import akka.http.scaladsl.model.headers.Accept import akka.http.scaladsl.server.Route import akka.stream.scaladsl.Source -import cats.kernel.Monoid import cats.syntax.either._ import com.typesafe.config.{ConfigObject, ConfigRenderOptions} import com.wavesplatform.account.Address @@ -20,12 +24,12 @@ import com.wavesplatform.lang.ValidationError import com.wavesplatform.mining.{Miner, MinerDebugInfo} import com.wavesplatform.network.{PeerDatabase, PeerInfo, _} import com.wavesplatform.settings.{RestAPISettings, WavesSettings} +import com.wavesplatform.state.{Blockchain, Height, LeaseBalance, NG, Portfolio, StateHash} import com.wavesplatform.state.diffs.TransactionDiffer import com.wavesplatform.state.reader.CompositeBlockchain -import com.wavesplatform.state.{Blockchain, Height, LeaseBalance, NG, Portfolio, StateHash} +import com.wavesplatform.transaction._ import com.wavesplatform.transaction.Asset.IssuedAsset import com.wavesplatform.transaction.TxValidationError.{GenericError, InvalidRequestSignature} -import com.wavesplatform.transaction._ import com.wavesplatform.transaction.smart.script.trace.{InvokeScriptTrace, TracedResult} import com.wavesplatform.transaction.smart.InvokeScriptTransaction import com.wavesplatform.utils.{ScorexLogging, Time} @@ -34,13 +38,8 @@ import com.wavesplatform.wallet.Wallet import io.netty.channel.Channel import monix.eval.{Coeval, Task} import monix.execution.Scheduler -import play.api.libs.json.Json.JsValueWrapper import play.api.libs.json._ - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -import scala.util.control.NonFatal -import scala.util.{Failure, Success} +import play.api.libs.json.Json.JsValueWrapper case class DebugApiRoute( ws: WavesSettings, @@ -80,7 +79,7 @@ case class DebugApiRoute( override lazy val route: Route = pathPrefix("debug") { stateChanges ~ balanceHistory ~ stateHash ~ validate ~ withAuth { - state ~ info ~ stateWaves ~ rollback ~ rollbackTo ~ blacklist ~ portfolios ~ minerInfo ~ configInfo ~ print + state ~ info ~ stateWaves ~ rollback ~ rollbackTo ~ blacklist ~ minerInfo ~ configInfo ~ print } } @@ -90,18 +89,6 @@ case class DebugApiRoute( "" }) - def portfolios: Route = path("portfolios" / AddrSegment) { address => - (get & parameter("considerUnspent".as[Boolean].?)) { considerUnspent => - extractScheduler { implicit s => - complete(accountsApi.portfolio(address).toListL.runToFuture.map { assetList => - val bd = accountsApi.balanceDetails(address) - val base = Portfolio(bd.regular, LeaseBalance(bd.leaseIn, bd.leaseOut), assetList.toMap) - if (considerUnspent.getOrElse(true)) Monoid.combine(base, utxStorage.pessimisticPortfolio(address)) else base - }) - } - } - } - def balanceHistory: Route = (path("balances" / "history" / AddrSegment) & get) { address => complete(Json.toJson(loadBalanceHistory(address).map { case (h, b) => Json.obj("height" -> h, "balance" -> b) @@ -246,21 +233,25 @@ case class DebugApiRoute( val transactionJson = parsedTransaction.fold(_ => jsv, _.json()) val serializer = tracedDiff.resultE - .fold(_ => this.serializer, { case (_, diff) => - val compositeBlockchain = CompositeBlockchain(blockchain, diff) - this.serializer.copy(blockchain = compositeBlockchain) + .fold(_ => this.serializer, { + case (_, diff) => + val compositeBlockchain = CompositeBlockchain(blockchain, diff) + this.serializer.copy(blockchain = compositeBlockchain) }) val extendedJson = tracedDiff.resultE - .fold(_ => jsv, { case (tx, diff) => - val meta = tx match { - case ist: InvokeScriptTransaction => - val result = diff.scriptResults.get(ist.id()) - TransactionMeta.Invoke(Height(blockchain.height), ist, succeeded = true, result) - case tx => TransactionMeta.Default(Height(blockchain.height), tx, succeeded = true) + .fold( + _ => jsv, { + case (tx, diff) => + val meta = tx match { + case ist: InvokeScriptTransaction => + val result = diff.scriptResults.get(ist.id()) + TransactionMeta.Invoke(Height(blockchain.height), ist, succeeded = true, result) + case tx => TransactionMeta.Default(Height(blockchain.height), tx, succeeded = true) + } + serializer.transactionWithMetaJson(meta) } - serializer.transactionWithMetaJson(meta) - }) + ) val response = Json.obj( "valid" -> error.isEmpty, diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala index 93633fee04f..fc5cfab67dd 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPool.scala @@ -1,21 +1,17 @@ package com.wavesplatform.utx -import com.wavesplatform.account.Address +import scala.concurrent.duration.FiniteDuration + import com.wavesplatform.common.state.ByteStr import com.wavesplatform.lang.ValidationError import com.wavesplatform.mining.MultiDimensionalMiningConstraint -import com.wavesplatform.state.Portfolio import com.wavesplatform.transaction._ import com.wavesplatform.transaction.smart.script.trace.TracedResult import com.wavesplatform.utx.UtxPool.PackStrategy -import scala.concurrent.duration.FiniteDuration - trait UtxPool extends AutoCloseable { def putIfNew(tx: Transaction, forceValidate: Boolean = false): TracedResult[ValidationError, Boolean] def removeAll(txs: Iterable[Transaction]): Unit - def spendableBalance(addr: Address, assetId: Asset): Long - def pessimisticPortfolio(addr: Address): Portfolio def all: Seq[Transaction] def size: Int def transactionById(transactionId: ByteStr): Option[Transaction] diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala index 186c262f038..e0ba5b2c1de 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPoolImpl.scala @@ -19,7 +19,7 @@ import com.wavesplatform.lang.ValidationError import com.wavesplatform.metrics._ import com.wavesplatform.mining.MultiDimensionalMiningConstraint import com.wavesplatform.settings.UtxSettings -import com.wavesplatform.state.{Blockchain, Diff, Portfolio} +import com.wavesplatform.state.{Blockchain, Diff} import com.wavesplatform.state.InvokeScriptResult.ErrorMessage import com.wavesplatform.state.diffs.TransactionDiffer import com.wavesplatform.state.diffs.TransactionDiffer.TransactionValidationError @@ -39,14 +39,12 @@ import kamon.metric.MeasurementUnit import monix.execution.ExecutionModel import monix.execution.atomic.AtomicBoolean import monix.execution.schedulers.SchedulerService -import monix.reactive.Observer import org.slf4j.LoggerFactory //noinspection ScalaStyle class UtxPoolImpl( time: Time, blockchain: Blockchain, - spendableBalanceChanged: Observer[(Address, Asset)], utxSettings: UtxSettings, onEvent: UtxEvent => Unit = _ => (), nanoTimeSource: () => TxTimestamp = () => System.nanoTime() @@ -63,9 +61,7 @@ class UtxPoolImpl( Schedulers.singleThread("utx-pool-cleanup", executionModel = ExecutionModel.AlwaysAsyncExecution) // State - private[this] val transactions = new ConcurrentHashMap[ByteStr, Transaction]() - private[this] val pessimisticPortfolios = new PessimisticPortfolios(spendableBalanceChanged, blockchain.transactionMeta(_).isDefined) // TODO delete in the future - + private[this] val transactions = new ConcurrentHashMap[ByteStr, Transaction]() private[this] val inUTXPoolOrdering = TransactionsOrdering.InUTXPool(utxSettings.fastLaneAddresses) override def putIfNew(tx: Transaction, forceValidate: Boolean): TracedResult[ValidationError, Boolean] = { @@ -195,7 +191,6 @@ class UtxPoolImpl( private[this] def removeFromOrdPool(txId: ByteStr): Option[Transaction] = { for (tx <- Option(transactions.remove(txId))) yield { PoolMetrics.removeTransaction(tx) - pessimisticPortfolios.remove(txId) tx } } @@ -228,7 +223,6 @@ class UtxPoolImpl( } def addPortfolio(): Unit = diffEi.map { diff => - pessimisticPortfolios.add(tx.id(), diff) onEvent(UtxEvent.TxAdded(tx, diff)) } @@ -244,19 +238,6 @@ class UtxPoolImpl( diffEi.map(_ => true) } - override def spendableBalance(addr: Address, assetId: Asset): Long = - blockchain.balance(addr, assetId) - - assetId.fold(blockchain.leaseBalance(addr).out)(_ => 0L) + - pessimisticPortfolios - .getAggregated(addr) - .spendableBalanceOf(assetId) - - override def pessimisticPortfolio(addr: Address): Portfolio = { - val priority = priorityPool.pessimisticPortfolios(addr) - val pessimistic = pessimisticPortfolios.getAggregated(addr) - Monoid.combineAll(priority :+ pessimistic) - } - private[utx] def nonPriorityTransactions: Seq[Transaction] = { transactions.values.asScala.toVector .sorted(inUTXPoolOrdering) @@ -350,7 +331,7 @@ class UtxPoolImpl( else { val updatedBlockchain = CompositeBlockchain(blockchain, r.totalDiff) val newCheckedAddresses = newScriptedAddresses ++ r.checkedAddresses - val e = differ(updatedBlockchain, tx).resultE + val e = differ(updatedBlockchain, tx).resultE e match { case Right(newDiff) => val updatedConstraint = r.constraint.put(updatedBlockchain, tx, newDiff) @@ -568,50 +549,4 @@ private object UtxPoolImpl { validatedTransactions: Set[ByteStr], removedTransactions: Set[ByteStr] ) - - class PessimisticPortfolios(spendableBalanceChanged: Observer[(Address, Asset)], isTxKnown: ByteStr => Boolean) { - private type Portfolios = Map[Address, Portfolio] - private val transactionPortfolios = new ConcurrentHashMap[ByteStr, Portfolios]() - private val transactions = new ConcurrentHashMap[Address, Set[ByteStr]]() - - def add(txId: ByteStr, txDiff: Diff): Unit = { - val pessimisticPortfolios = txDiff.portfolios.map { case (addr, portfolio) => addr -> portfolio.pessimistic } - val nonEmptyPessimisticPortfolios = pessimisticPortfolios.filterNot { case (_, portfolio) => portfolio.isEmpty } - - if (nonEmptyPessimisticPortfolios.nonEmpty && - Option(transactionPortfolios.put(txId, nonEmptyPessimisticPortfolios)).isEmpty) { - nonEmptyPessimisticPortfolios.keys.foreach { address => - transactions.put(address, transactions.getOrDefault(address, Set.empty) + txId) - } - } - - // Because we need to notify about balance changes when they are applied - pessimisticPortfolios.foreach { - case (addr, p) => p.assetIds.foreach(assetId => spendableBalanceChanged.onNext(addr -> assetId)) - } - } - - def getAggregated(accountAddr: Address): Portfolio = { - val portfolios = for { - txId <- transactions.getOrDefault(accountAddr, Set.empty).toSeq - if !isTxKnown(txId) - txPortfolios = transactionPortfolios.getOrDefault(txId, Map.empty[Address, Portfolio]) - txAccountPortfolio <- txPortfolios.get(accountAddr).toSeq - } yield txAccountPortfolio - - Monoid.combineAll(portfolios) - } - - def remove(txId: ByteStr): Unit = { - Option(transactionPortfolios.remove(txId)) match { - case Some(txPortfolios) => - txPortfolios.foreach { - case (addr, p) => - transactions.computeIfPresent(addr, (_, prevTxs) => prevTxs - txId) - p.assetIds.foreach(assetId => spendableBalanceChanged.onNext(addr -> assetId)) - } - case None => - } - } - } } diff --git a/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala b/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala index 4f16d960d0b..653a10db9cf 100644 --- a/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala +++ b/node/src/main/scala/com/wavesplatform/utx/UtxPriorityPool.scala @@ -7,9 +7,8 @@ import scala.annotation.tailrec import cats.kernel.Monoid import com.wavesplatform.ResponsivenessLogs -import com.wavesplatform.account.Address import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.state.{Blockchain, Diff, Portfolio} +import com.wavesplatform.state.{Blockchain, Diff} import com.wavesplatform.state.reader.CompositeBlockchain import com.wavesplatform.transaction.Transaction import com.wavesplatform.utils.{OptimisticLockable, ScorexLogging} @@ -89,12 +88,6 @@ final class UtxPriorityPool(base: Blockchain) extends ScorexLogging with Optimis def contains(txId: ByteStr): Boolean = transactionById(txId).nonEmpty - def pessimisticPortfolios(addr: Address): Seq[Portfolio] = - for { - diff <- validPriorityDiffs - (a, pf) <- diff.portfolios if a == addr - } yield pf.pessimistic - def nextMicroBlockSize(limit: Int): Int = { @tailrec def nextMicroBlockSizeRec(last: Int, diffs: Seq[Diff]): Int = (diffs: @unchecked) match { diff --git a/node/src/test/scala/com/wavesplatform/history/Domain.scala b/node/src/test/scala/com/wavesplatform/history/Domain.scala index 898b25002aa..703326f4cd1 100644 --- a/node/src/test/scala/com/wavesplatform/history/Domain.scala +++ b/node/src/test/scala/com/wavesplatform/history/Domain.scala @@ -29,7 +29,6 @@ import com.wavesplatform.utils.SystemTime import com.wavesplatform.utx.UtxPoolImpl import com.wavesplatform.wallet.Wallet import monix.execution.Scheduler.Implicits.global -import monix.reactive.Observer import org.iq80.leveldb.DB case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWriter: LevelDBWriter, settings: WavesSettings) { @@ -45,7 +44,7 @@ case class Domain(db: DB, blockchainUpdater: BlockchainUpdaterImpl, levelDBWrite val transactionDiffer: Transaction => TracedResult[ValidationError, Diff] = TransactionDiffer(blockchain.lastBlockTimestamp, System.currentTimeMillis())(blockchain, _) - lazy val utxPool = new UtxPoolImpl(SystemTime, blockchain, Observer.empty, settings.utxSettings) + lazy val utxPool = new UtxPoolImpl(SystemTime, blockchain, settings.utxSettings) lazy val wallet = Wallet(settings.walletSettings.copy(file = None)) object commonApi { diff --git a/node/src/test/scala/com/wavesplatform/mining/BlockV5Test.scala b/node/src/test/scala/com/wavesplatform/mining/BlockV5Test.scala index cf4927210e7..b2e700b2c5e 100644 --- a/node/src/test/scala/com/wavesplatform/mining/BlockV5Test.scala +++ b/node/src/test/scala/com/wavesplatform/mining/BlockV5Test.scala @@ -2,11 +2,15 @@ package com.wavesplatform.mining import java.util.concurrent.atomic.AtomicReference +import scala.concurrent.Await +import scala.concurrent.duration._ + import com.typesafe.config.ConfigFactory +import com.wavesplatform.{crypto, protobuf, BlocksTransactionsHelpers, TestTime} import com.wavesplatform.account.{AddressOrAlias, KeyPair} +import com.wavesplatform.block.{Block, BlockHeader, SignedBlockHeader} import com.wavesplatform.block.serialization.{BlockHeaderSerializer, BlockSerializer} import com.wavesplatform.block.validation.Validators -import com.wavesplatform.block.{Block, BlockHeader, SignedBlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.common.utils.EitherExt2 import com.wavesplatform.consensus.PoSSelector @@ -17,16 +21,15 @@ import com.wavesplatform.lagonaki.mocks.TestBlock import com.wavesplatform.lang.ValidationError import com.wavesplatform.protobuf.block.PBBlocks import com.wavesplatform.settings.{Constants, FunctionalitySettings, TestFunctionalitySettings, WalletSettings, WavesSettings} +import com.wavesplatform.state.{diffs, Blockchain, BlockchainUpdaterImpl, NG} import com.wavesplatform.state.appender.BlockAppender -import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, NG, diffs} import com.wavesplatform.test.FlatSpec +import com.wavesplatform.transaction.{BlockchainUpdater, GenesisTransaction, Transaction, TxVersion} import com.wavesplatform.transaction.Asset.Waves import com.wavesplatform.transaction.transfer.TransferTransaction -import com.wavesplatform.transaction.{BlockchainUpdater, GenesisTransaction, Transaction, TxVersion} import com.wavesplatform.utils.Time import com.wavesplatform.utx.UtxPoolImpl import com.wavesplatform.wallet.Wallet -import com.wavesplatform.{BlocksTransactionsHelpers, TestTime, crypto, protobuf} import io.netty.channel.group.DefaultChannelGroup import io.netty.util.concurrent.GlobalEventExecutor import monix.eval.Task @@ -36,9 +39,6 @@ import org.scalacheck.Gen import org.scalatest._ import org.scalatest.enablers.Length -import scala.concurrent.Await -import scala.concurrent.duration._ - class BlockV5Test extends FlatSpec with WithDomain @@ -469,7 +469,7 @@ class BlockV5Test val pos = PoSSelector(blockchain, settings.synchronizationSettings.maxBaseTarget) val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) val wallet = Wallet(WalletSettings(None, Some("123"), None)) - val utxPool = new UtxPoolImpl(time, blockchain, Observer.stopped, settings.utxSettings) + val utxPool = new UtxPoolImpl(time, blockchain, settings.utxSettings) val minerScheduler = Scheduler.singleThread("miner") val appenderScheduler = Scheduler.singleThread("appender") val miner = new MinerImpl(allChannels, blockchain, settings, time, utxPool, wallet, pos, minerScheduler, appenderScheduler) diff --git a/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala b/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala index 2baced0a77d..6f45aa4b30f 100644 --- a/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala +++ b/node/src/test/scala/com/wavesplatform/mining/BlockWithMaxBaseTargetTest.scala @@ -3,6 +3,9 @@ package com.wavesplatform.mining import java.security.Permission import java.util.concurrent.{Semaphore, TimeUnit} +import scala.concurrent.Await +import scala.concurrent.duration._ + import com.typesafe.config.ConfigFactory import com.wavesplatform.account.KeyPair import com.wavesplatform.block.Block @@ -30,9 +33,6 @@ import monix.execution.Scheduler import monix.execution.schedulers.SchedulerService import org.scalacheck.{Arbitrary, Gen} -import scala.concurrent.Await -import scala.concurrent.duration._ - class BlockWithMaxBaseTargetTest extends FreeSpec with WithDB with DBCacheSettings { "base target limit" - { @@ -134,7 +134,7 @@ class BlockWithMaxBaseTargetTest extends FreeSpec with WithDB with DBCacheSettin val bcu = new BlockchainUpdaterImpl(defaultWriter, ignoreSpendableBalanceChanged, settings, ntpTime, ignoreBlockchainUpdateTriggers, (_, _) => Seq.empty) val pos = PoSSelector(bcu, settings.synchronizationSettings.maxBaseTarget) - val utxPoolStub = new UtxPoolImpl(ntpTime, bcu, ignoreSpendableBalanceChanged, settings0.utxSettings) + val utxPoolStub = new UtxPoolImpl(ntpTime, bcu, settings0.utxSettings) val schedulerService: SchedulerService = Scheduler.singleThread("appender") try { diff --git a/node/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala b/node/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala index 5643b59e385..0fb8dfc6a7e 100644 --- a/node/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala +++ b/node/src/test/scala/com/wavesplatform/mining/MicroBlockMinerSpec.scala @@ -1,5 +1,8 @@ package com.wavesplatform.mining +import scala.concurrent.duration._ +import scala.util.Random + import com.wavesplatform.TestValues import com.wavesplatform.account.Alias import com.wavesplatform.block.Block @@ -16,9 +19,6 @@ import com.wavesplatform.utx.UtxPoolImpl import monix.execution.Scheduler import org.scalamock.scalatest.PathMockFactory -import scala.concurrent.duration._ -import scala.util.Random - class MicroBlockMinerSpec extends FlatSpec with PathMockFactory with WithDomain { "Micro block miner" should "generate microblocks in flat interval" in { val scheduler = Schedulers.singleThread("test") @@ -27,7 +27,7 @@ class MicroBlockMinerSpec extends FlatSpec with PathMockFactory with WithDomain val settings = domainSettingsWithFS(TestFunctionalitySettings.withFeatures(BlockchainFeatures.NG)) withDomain(settings) { d => d.appendBlock(TestBlock.create(Seq(genesis))) - val utxPool = new UtxPoolImpl(ntpTime, d.blockchainUpdater, ignoreSpendableBalanceChanged, settings.utxSettings) + val utxPool = new UtxPoolImpl(ntpTime, d.blockchainUpdater, settings.utxSettings) val microBlockMiner = new MicroBlockMinerImpl( _ => (), null, diff --git a/node/src/test/scala/com/wavesplatform/mining/MiningFailuresSuite.scala b/node/src/test/scala/com/wavesplatform/mining/MiningFailuresSuite.scala index 0d737fdd16c..d34aa67797b 100644 --- a/node/src/test/scala/com/wavesplatform/mining/MiningFailuresSuite.scala +++ b/node/src/test/scala/com/wavesplatform/mining/MiningFailuresSuite.scala @@ -8,8 +8,8 @@ import com.wavesplatform.common.state.ByteStr import com.wavesplatform.consensus.PoSSelector import com.wavesplatform.lagonaki.mocks.TestBlock import com.wavesplatform.settings._ +import com.wavesplatform.state.{BalanceSnapshot, Blockchain, BlockMinerInfo, NG} import com.wavesplatform.state.diffs.ENOUGH_AMT -import com.wavesplatform.state.{BalanceSnapshot, BlockMinerInfo, Blockchain, NG} import com.wavesplatform.test.FlatSpec import com.wavesplatform.transaction.BlockchainUpdater import com.wavesplatform.transaction.TxValidationError.BlockFromFuture @@ -53,7 +53,7 @@ class MiningFailuresSuite extends FlatSpec with PathMockFactory with WithDB { val scheduler = Scheduler.singleThread("appender") val allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) val wallet = Wallet(WalletSettings(None, Some("123"), None)) - val utxPool = new UtxPoolImpl(ntpTime, blockchainUpdater, ignoreSpendableBalanceChanged, wavesSettings.utxSettings) + val utxPool = new UtxPoolImpl(ntpTime, blockchainUpdater, wavesSettings.utxSettings) val pos = PoSSelector(blockchainUpdater, wavesSettings.synchronizationSettings.maxBaseTarget) new MinerImpl( allChannels, diff --git a/node/src/test/scala/com/wavesplatform/mining/MiningWithRewardSuite.scala b/node/src/test/scala/com/wavesplatform/mining/MiningWithRewardSuite.scala index 2e4007bcfd9..74d7f238ac6 100644 --- a/node/src/test/scala/com/wavesplatform/mining/MiningWithRewardSuite.scala +++ b/node/src/test/scala/com/wavesplatform/mining/MiningWithRewardSuite.scala @@ -1,7 +1,11 @@ package com.wavesplatform.mining +import scala.concurrent.Future +import scala.concurrent.duration._ + import cats.effect.Resource import com.typesafe.config.ConfigFactory +import com.wavesplatform.{TransactionGen, WithDB} import com.wavesplatform.account.KeyPair import com.wavesplatform.block.Block import com.wavesplatform.common.state.ByteStr @@ -12,14 +16,13 @@ import com.wavesplatform.db.DBCacheSettings import com.wavesplatform.features.{BlockchainFeature, BlockchainFeatures} import com.wavesplatform.lagonaki.mocks.TestBlock import com.wavesplatform.settings._ -import com.wavesplatform.state.diffs.ENOUGH_AMT import com.wavesplatform.state.{Blockchain, BlockchainUpdaterImpl, NG} +import com.wavesplatform.state.diffs.ENOUGH_AMT +import com.wavesplatform.transaction.{BlockchainUpdater, GenesisTransaction, Transaction} import com.wavesplatform.transaction.Asset.Waves import com.wavesplatform.transaction.transfer.TransferTransaction -import com.wavesplatform.transaction.{BlockchainUpdater, GenesisTransaction, Transaction} import com.wavesplatform.utx.UtxPoolImpl import com.wavesplatform.wallet.Wallet -import com.wavesplatform.{TransactionGen, WithDB} import io.netty.channel.group.DefaultChannelGroup import io.netty.util.concurrent.GlobalEventExecutor import monix.eval.Task @@ -30,9 +33,6 @@ import org.scalatest.compatible.Assertion import org.scalatest.flatspec.AsyncFlatSpec import org.scalatest.matchers.should.Matchers -import scala.concurrent.Future -import scala.concurrent.duration._ - class MiningWithRewardSuite extends AsyncFlatSpec with Matchers with WithDB with TransactionGen with DBCacheSettings { import MiningWithRewardSuite._ @@ -122,7 +122,7 @@ class MiningWithRewardSuite extends AsyncFlatSpec with Matchers with WithDB with for { _ <- Task.unit pos = PoSSelector(blockchainUpdater, settings.synchronizationSettings.maxBaseTarget) - utxPool = new UtxPoolImpl(ntpTime, blockchainUpdater, ignoreSpendableBalanceChanged, settings.utxSettings) + utxPool = new UtxPoolImpl(ntpTime, blockchainUpdater, settings.utxSettings) scheduler = Scheduler.singleThread("appender") allChannels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE) wallet = Wallet(WalletSettings(None, Some("123"), None)) diff --git a/node/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala b/node/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala index 088a0398053..05ac534fcd7 100644 --- a/node/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala +++ b/node/src/test/scala/com/wavesplatform/state/appender/ExtensionAppenderSpec.scala @@ -10,11 +10,10 @@ import com.wavesplatform.transaction.TxHelpers import com.wavesplatform.utils.SystemTime import com.wavesplatform.utx.UtxPoolImpl import monix.execution.Scheduler.Implicits.global -import monix.reactive.subjects.ConcurrentSubject class ExtensionAppenderSpec extends FlatSpec with WithDomain { "Extension appender" should "drop duplicate transactions from UTX" in withDomain() { d => - val utx = new UtxPoolImpl(SystemTime, d.blockchain, ConcurrentSubject.publish, SettingsFromDefaultConfig.utxSettings) + val utx = new UtxPoolImpl(SystemTime, d.blockchain, SettingsFromDefaultConfig.utxSettings) val time = new TestTime() val extensionAppender = ExtensionAppender(d.blockchain, utx, d.posSelector, time, InvalidBlockStorage.NoOp, PeerDatabase.NoOp, global)(null, _) diff --git a/node/src/test/scala/com/wavesplatform/transaction/InvokeScriptComplexitySpec.scala b/node/src/test/scala/com/wavesplatform/transaction/InvokeScriptComplexitySpec.scala index 6b98f40e2fa..72632706f3b 100644 --- a/node/src/test/scala/com/wavesplatform/transaction/InvokeScriptComplexitySpec.scala +++ b/node/src/test/scala/com/wavesplatform/transaction/InvokeScriptComplexitySpec.scala @@ -13,10 +13,9 @@ import com.wavesplatform.lang.v1.estimator.v3.ScriptEstimatorV3 import com.wavesplatform.test._ import com.wavesplatform.transaction.Asset.Waves import com.wavesplatform.transaction.assets.IssueTransaction -import com.wavesplatform.transaction.smart.script.ScriptCompiler import com.wavesplatform.transaction.smart.{InvokeScriptTransaction, SetScriptTransaction} +import com.wavesplatform.transaction.smart.script.ScriptCompiler import com.wavesplatform.utx.UtxPoolImpl -import monix.reactive.Observer class InvokeScriptComplexitySpec extends FreeSpec with WithDomain with NTPTime { private[this] val dApp1 = TestCompiler(V5).compileContract(""" @@ -82,7 +81,7 @@ class InvokeScriptComplexitySpec extends FreeSpec with WithDomain with NTPTime { "correctly estimates complexity when child dApp invocation involves payment in smart asset" in forAll(gen) { case (invoker, dApp0KP, dApp1KP) => withDomain(settings) { d => - val utx = new UtxPoolImpl(ntpTime, d.blockchain, Observer.stopped, settings.utxSettings) + val utx = new UtxPoolImpl(ntpTime, d.blockchain, settings.utxSettings) d.appendBlock( Seq(invoker.toAddress, dApp0KP.toAddress, dApp1KP.toAddress) diff --git a/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala b/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala index 37d40ef99b2..aa6a0cf0d15 100644 --- a/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala +++ b/node/src/test/scala/com/wavesplatform/utx/UtxFailedTxsSpec.scala @@ -1,5 +1,7 @@ package com.wavesplatform.utx +import scala.concurrent.duration._ + import com.wavesplatform.TestValues import com.wavesplatform.common.utils._ import com.wavesplatform.db.WithDomain @@ -16,11 +18,8 @@ import com.wavesplatform.transaction.TxHelpers import com.wavesplatform.transaction.assets.exchange.OrderType import com.wavesplatform.transaction.smart.InvokeScriptTransaction.Payment import com.wavesplatform.transaction.smart.script.ScriptCompiler -import monix.reactive.subjects.PublishSubject import org.scalatest.concurrent.Eventually -import scala.concurrent.duration._ - //noinspection RedundantDefaultArgument class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { val dApp = TxHelpers.secondSigner @@ -298,7 +297,7 @@ class UtxFailedTxsSpec extends FlatSpec with WithDomain with Eventually { TxHelpers.genesis(dApp.toAddress, Long.MaxValue / 3) ) - val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, PublishSubject(), settings.utxSettings) + val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, settings.utxSettings) f(d, utx) utx.close() } diff --git a/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala b/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala index c83ac783e25..798444a5202 100644 --- a/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/utx/UtxPoolSpecification.scala @@ -1,5 +1,11 @@ package com.wavesplatform.utx +import java.nio.file.{Files, Path} + +import scala.collection.mutable.ListBuffer +import scala.concurrent.duration._ +import scala.util.Random + import cats.data.NonEmptyList import com.wavesplatform import com.wavesplatform._ @@ -8,17 +14,18 @@ import com.wavesplatform.block.{Block, SignedBlockHeader} import com.wavesplatform.common.state.ByteStr import com.wavesplatform.common.utils.EitherExt2 import com.wavesplatform.consensus.TransactionsOrdering -import com.wavesplatform.database.{LevelDBWriter, TestStorageFactory, openDB} +import com.wavesplatform.database.{openDB, LevelDBWriter, TestStorageFactory} import com.wavesplatform.db.WithDomain import com.wavesplatform.events.UtxEvent import com.wavesplatform.features.BlockchainFeatures +import com.wavesplatform.history.{randomSig, settingsWithFeatures, DefaultWavesSettings} import com.wavesplatform.history.Domain.BlockchainUpdaterExt -import com.wavesplatform.history.{DefaultWavesSettings, randomSig, settingsWithFeatures} import com.wavesplatform.lagonaki.mocks.TestBlock +import com.wavesplatform.lang.directives.values.StdLibVersion.V6 import com.wavesplatform.lang.script.Script import com.wavesplatform.lang.script.v1.ExprScript -import com.wavesplatform.lang.v1.compiler.Terms.EXPR import com.wavesplatform.lang.v1.compiler.{CompilerContext, ExpressionCompiler, TestCompiler} +import com.wavesplatform.lang.v1.compiler.Terms.EXPR import com.wavesplatform.lang.v1.estimator.ScriptEstimatorV1 import com.wavesplatform.lang.v1.estimator.v3.ScriptEstimatorV3 import com.wavesplatform.mining._ @@ -26,30 +33,23 @@ import com.wavesplatform.settings._ import com.wavesplatform.state._ import com.wavesplatform.state.diffs._ import com.wavesplatform.state.utils.TestLevelDB +import com.wavesplatform.test.FreeSpec +import com.wavesplatform.transaction.{Asset, Transaction, _} import com.wavesplatform.transaction.Asset.Waves import com.wavesplatform.transaction.TxValidationError.{GenericError, SenderIsBlacklisted} -import com.wavesplatform.transaction.smart.script.ScriptCompiler import com.wavesplatform.transaction.smart.{InvokeScriptTransaction, SetScriptTransaction} -import com.wavesplatform.transaction.transfer.MassTransferTransaction.ParsedTransfer +import com.wavesplatform.transaction.smart.script.ScriptCompiler import com.wavesplatform.transaction.transfer._ -import com.wavesplatform.transaction.{Asset, Transaction, _} +import com.wavesplatform.transaction.transfer.MassTransferTransaction.ParsedTransfer import com.wavesplatform.utils.Time import com.wavesplatform.utx.UtxPool.PackStrategy import monix.reactive.subjects.PublishSubject import org.iq80.leveldb.DB -import org.scalacheck.Gen._ import org.scalacheck.{Arbitrary, Gen} +import org.scalacheck.Gen._ import org.scalamock.scalatest.MockFactory import org.scalatest.concurrent.Eventually import org.scalatest.EitherValues -import java.nio.file.{Files, Path} - -import com.wavesplatform.lang.directives.values.StdLibVersion.V6 -import com.wavesplatform.test.FreeSpec - -import scala.collection.mutable.ListBuffer -import scala.concurrent.duration._ -import scala.util.Random private object UtxPoolSpecification { private val ignoreSpendableBalanceChanged = PublishSubject[(Address, Asset)]() @@ -189,11 +189,7 @@ class UtxPoolSpecification } yield { val time = new TestTime() val utx = - new UtxPoolImpl( - time, - bcu, - ignoreSpendableBalanceChanged, - UtxSettings( + new UtxPoolImpl(time, bcu, UtxSettings( 10, PoolDefaultMaxBytes, 1000, @@ -202,71 +198,12 @@ class UtxPoolSpecification Set.empty, allowTransactionsFromSmartAccounts = true, allowSkipChecks = false - ) - ) + )) val amountPart = (senderBalance - fee) / 2 - fee val txs = for (_ <- 1 to n) yield createWavesTransfer(sender, recipient.toAddress, amountPart, fee, time.getTimestamp()).explicitGet() (utx, time, txs, (offset + 1000).millis) }).label("twoOutOfManyValidPayments") - private val emptyUtxPool = stateGen - .map { - case (sender, _, bcu) => - val time = new TestTime() - val utxPool = - new UtxPoolImpl( - time, - bcu, - ignoreSpendableBalanceChanged, - UtxSettings( - 10, - PoolDefaultMaxBytes, - 1000, - Set.empty, - Set.empty, - Set.empty, - allowTransactionsFromSmartAccounts = true, - allowSkipChecks = false - ) - ) - (sender, bcu, utxPool) - } - .label("emptyUtxPool") - - private val withValidPayments = (for { - (sender, senderBalance, bcu) <- stateGen - recipient <- accountGen - time = new TestTime() - txs <- Gen.nonEmptyListOf(transferWithRecipient(sender, recipient.publicKey, senderBalance / 10, time)) - } yield { - val settings = - UtxSettings(10, PoolDefaultMaxBytes, 1000, Set.empty, Set.empty, Set.empty, allowTransactionsFromSmartAccounts = true, allowSkipChecks = false) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings) - txs.foreach(utxPool.putIfNew(_)) - (sender, bcu, utxPool, time, settings) - }).label("withValidPayments") - - private val withValidPaymentsNotAdded = (for { - (sender, senderBalance, bcu) <- stateGen - recipient <- accountGen - time = new TestTime() - txs <- Gen.nonEmptyListOf(transferWithRecipient(sender, recipient.publicKey, senderBalance / 10, time)) - } yield { - val settings = - UtxSettings( - txs.size, - PoolDefaultMaxBytes, - 1000, - Set.empty, - Set.empty, - Set.empty, - allowTransactionsFromSmartAccounts = true, - allowSkipChecks = false - ) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings) - (sender, bcu, utxPool, txs, time, settings) - }).label("withValidPayments") - private val withBlacklisted = (for { (sender, senderBalance, bcu) <- stateGen recipient <- accountGen @@ -284,7 +221,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = false ) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings) + val utxPool = new UtxPoolImpl(time, bcu, settings) (sender, utxPool, txs) }).label("withBlacklisted") @@ -305,7 +242,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = false ) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings) + val utxPool = new UtxPoolImpl(time, bcu, settings) (sender, utxPool, txs) }).label("withBlacklistedAndAllowedByRule") @@ -326,7 +263,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = false ) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings) + val utxPool = new UtxPoolImpl(time, bcu, settings) (sender, utxPool, txs) }).label("withBlacklistedAndWhitelisted") @@ -351,7 +288,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = false ) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings) + val utxPool = new UtxPoolImpl(time, bcu, settings) (sender, utxPool, txs) }).label("massTransferWithBlacklisted") @@ -361,7 +298,7 @@ class UtxPoolSpecification val time = new TestTime() forAll(listOfN(count, transfer(sender, senderBalance / 2, time))) { txs => - val utx = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, utxSettings) + val utx = new UtxPoolImpl(time, bcu, utxSettings) f(txs, utx, time) } } @@ -375,11 +312,7 @@ class UtxPoolSpecification tx2 <- listOfN(count1, transfer(sender, senderBalance / 2, new TestTime(ts + maxAge.toMillis + 1000))) } yield { val time = new TestTime() - val utx = new UtxPoolImpl( - time, - bcu, - ignoreSpendableBalanceChanged, - UtxSettings( + val utx = new UtxPoolImpl(time, bcu, UtxSettings( 10, PoolDefaultMaxBytes, 1000, @@ -388,8 +321,7 @@ class UtxPoolSpecification Set.empty, allowTransactionsFromSmartAccounts = true, allowSkipChecks = false - ) - ) + )) (utx, time, tx1, tx2) } @@ -417,11 +349,7 @@ class UtxPoolSpecification } yield { // val smartAccountsFs = TestFunctionalitySettings.Enabled.copy(preActivatedFeatures = Map(BlockchainFeatures.SmartAccounts.id -> 0)) preconditions.foreach(b => bcu.processBlock(b) should beRight) - val utx = new UtxPoolImpl( - time, - bcu, - ignoreSpendableBalanceChanged, - UtxSettings( + val utx = new UtxPoolImpl(time, bcu, UtxSettings( 10, PoolDefaultMaxBytes, 1000, @@ -430,8 +358,7 @@ class UtxPoolSpecification Set.empty, allowTransactionsFromSmartAccounts = scEnabled, allowSkipChecks = false - ) - ) + )) (sender, senderBalance, utx, bcu.lastBlockTimestamp.getOrElse(0L)) } @@ -483,7 +410,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = allowSkipChecks == 1 ) - val utx = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, utxSettings) + val utx = new UtxPoolImpl(time, bcu, utxSettings) utx.putIfNew(headTransaction).resultE should beRight utx.putIfNew(vipTransaction).resultE should matchPattern { @@ -530,7 +457,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = allowSkipChecks ) - val utx = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, utxSettings) + val utx = new UtxPoolImpl(time, bcu, utxSettings) utx.putIfNew(headTransaction).resultE.explicitGet() utx.putIfNew(vipTransaction).resultE.explicitGet() @@ -602,7 +529,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = allowSkipChecks ) - val utx = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, utxSettings) + val utx = new UtxPoolImpl(time, bcu, utxSettings) Random.shuffle(whitelistedTxs ++ txs).foreach(tx => utx.putIfNew(tx)) @@ -670,11 +597,7 @@ class UtxPoolSpecification val (_, block, scripted, unscripted) = generateBlock.sample.get d.blockchainUpdater.processBlock(block) should beRight - val utx = new UtxPoolImpl( - ntpTime, - d.blockchainUpdater, - ignoreSpendableBalanceChanged, - UtxSettings( + val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, UtxSettings( 9999999, PoolDefaultMaxBytes, 999999, @@ -683,8 +606,7 @@ class UtxPoolSpecification Set.empty, allowTransactionsFromSmartAccounts = true, allowSkipChecks = false - ) - ) + )) (scripted ++ unscripted).foreach(tx => utx.putIfNew(tx).resultE.explicitGet()) val constraint = MultiDimensionalMiningConstraint( @@ -699,61 +621,6 @@ class UtxPoolSpecification } } - "pessimisticPortfolio" - { - "is not empty if there are transactions" in forAll(withValidPayments) { - case (sender, _, utxPool, _, _) => - utxPool.size should be > 0 - utxPool.pessimisticPortfolio(sender.toAddress) should not be empty - } - - "is empty if there is no transactions" in forAll(emptyUtxPool) { - case (sender, _, utxPool) => - utxPool.size shouldBe 0 - utxPool.pessimisticPortfolio(sender.toAddress) shouldBe empty - } - - "is empty if utx pool was cleaned" in forAll(withValidPayments) { - case (sender, _, utxPool, _, _) => - utxPool.removeAll(utxPool.all) - utxPool.pessimisticPortfolio(sender.toAddress) shouldBe empty - } - - "is changed after transactions with these assets are removed" in forAll(withValidPayments) { - case (sender, _, utxPool, time, _) => - val portfolioBefore = utxPool.pessimisticPortfolio(sender.toAddress) - val poolSizeBefore = utxPool.size - - time.advance(maxAge * 2) - utxPool.packUnconfirmed(limitByNumber(100), PackStrategy.Unlimited) - - poolSizeBefore should be > utxPool.size - val portfolioAfter = utxPool.pessimisticPortfolio(sender.toAddress) - - portfolioAfter should not be portfolioBefore - } - } - - "spendableBalance" - { - "equal to state's portfolio if utx is empty" in forAll(emptyUtxPool) { - case (sender, _, utxPool) => - val pessimisticAssetIds = { - val p = utxPool.pessimisticPortfolio(sender.toAddress) - p.assetIds.filter(x => p.balanceOf(x) != 0) - } - - pessimisticAssetIds shouldBe empty - } - - "takes into account added txs" in forAll(withValidPaymentsNotAdded) { - case (sender, _, utxPool, txs, _, _) => - val emptyPf = utxPool.pessimisticPortfolio(sender.toAddress) - txs.foreach(utxPool.putIfNew(_).resultE should beRight) - utxPool.pessimisticPortfolio(sender.toAddress) should not be emptyPf - } - - "takes into account unconfirmed transactions" in pending - } - "blacklisting" - { "prevent a transfer transaction from specific addresses" in { val transferGen = Gen.oneOf(withBlacklisted, massTransferWithBlacklisted(allowRecipients = false)) @@ -847,7 +714,7 @@ class UtxPoolSpecification allowTransactionsFromSmartAccounts = true, allowSkipChecks = false ) - val utxPool = new UtxPoolImpl(time, bcu, ignoreSpendableBalanceChanged, settings, nanoTimeSource = () => nanoTimeSource()) + val utxPool = new UtxPoolImpl(time, bcu, settings, nanoTimeSource = () => nanoTimeSource()) utxPool.putIfNew(transfer).resultE should beRight val (tx, _) = utxPool.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, PackStrategy.Limit(100 nanos)) @@ -866,7 +733,7 @@ class UtxPoolSpecification allowSkipChecks = false, fastLaneAddresses = Set.empty ) - val utxPool = new UtxPoolImpl(ntpTime, d.blockchainUpdater, ignoreSpendableBalanceChanged, settings) + val utxPool = new UtxPoolImpl(ntpTime, d.blockchainUpdater, settings) val startTime = System.nanoTime() val (result, _) = utxPool.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, PackStrategy.Estimate(3 seconds)) result shouldBe None @@ -922,7 +789,7 @@ class UtxPoolSpecification d.appendBlock(setScripts: _*) val invoke = TxHelpers.invoke(genesisTxs.head.recipient, "default") - val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, PublishSubject(), DefaultWavesSettings.utxSettings) + val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, DefaultWavesSettings.utxSettings) utx.putIfNew(invoke, forceValidate = true).resultE.explicitGet() shouldBe true utx.removeAll(Seq(invoke)) utx.putIfNew(invoke, forceValidate = false).resultE.explicitGet() shouldBe true @@ -941,7 +808,7 @@ class UtxPoolSpecification val expr = TestCompiler(V6).compileFreeCall(""" [ BooleanEntry("check", true) ] """) val invoke = TxHelpers.invokeExpression(expr) - val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, PublishSubject(), DefaultWavesSettings.utxSettings) + val utx = new UtxPoolImpl(ntpTime, d.blockchainUpdater, DefaultWavesSettings.utxSettings) utx.putIfNew(invoke).resultE.explicitGet() shouldBe true utx.all shouldBe Seq(invoke) @@ -968,7 +835,7 @@ class UtxPoolSpecification (() => blockchain.activatedFeatures).when().returning(Map.empty) val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) (blockchain.balance _).when(*, *).returning(ENOUGH_AMT).repeat((rest.length + 1) * 2) (blockchain.balance _) @@ -1018,10 +885,10 @@ class UtxPoolSpecification val time = new TestTime() val events = new ListBuffer[UtxEvent] val utxPool = - new UtxPoolImpl(time, d.blockchainUpdater, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings, events += _) + new UtxPoolImpl(time, d.blockchainUpdater, WavesSettings.default().utxSettings, events += _) def assertEvents(f: PartialFunction[Seq[UtxEvent], Unit]): Unit = { - val currentEvents = events.toVector + val currentEvents = events.toList f(currentEvents) events.clear() } diff --git a/node/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala b/node/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala index fe57a53904e..5e82558c564 100644 --- a/node/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala +++ b/node/src/test/scala/com/wavesplatform/utx/UtxPriorityPoolSpecification.scala @@ -1,7 +1,10 @@ package com.wavesplatform.utx +import scala.concurrent.duration._ + import cats.data.NonEmptyList import cats.kernel.Monoid +import com.wavesplatform.{BlocksTransactionsHelpers, TestValues} import com.wavesplatform.account.{Address, KeyPair, PublicKey} import com.wavesplatform.block.SignedBlockHeader import com.wavesplatform.common.state.ByteStr @@ -12,15 +15,14 @@ import com.wavesplatform.features.BlockchainFeatures import com.wavesplatform.lagonaki.mocks.TestBlock import com.wavesplatform.mining.{MultiDimensionalMiningConstraint, OneDimensionalMiningConstraint, TxEstimators} import com.wavesplatform.settings.WavesSettings -import com.wavesplatform.state.diffs.ENOUGH_AMT import com.wavesplatform.state.{AccountScriptInfo, Blockchain, Diff, LeaseBalance, Portfolio} +import com.wavesplatform.state.diffs.ENOUGH_AMT import com.wavesplatform.test.FreeSpec +import com.wavesplatform.transaction.{Transaction, TxHelpers, TxVersion} import com.wavesplatform.transaction.Asset.Waves import com.wavesplatform.transaction.transfer.TransferTransaction -import com.wavesplatform.transaction.{Transaction, TxHelpers, TxVersion} import com.wavesplatform.utils.Time import com.wavesplatform.utx.UtxPool.PackStrategy -import com.wavesplatform.{BlocksTransactionsHelpers, TestValues} import org.scalacheck.Gen import org.scalacheck.Gen.chooseNum import org.scalamock.scalatest.MockFactory @@ -29,8 +31,6 @@ import org.scalatest.concurrent.Eventually import org.scalatest.concurrent.PatienceConfiguration.{Interval, Timeout} import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks -import scala.concurrent.duration._ - class UtxPriorityPoolSpecification extends FreeSpec with MockFactory @@ -57,19 +57,6 @@ class UtxPriorityPoolSpecification } } - def assertPortfolios(utx: UtxPool, transactions: Seq[TransferTransaction]): Unit = { - val portfolios = transactions.groupBy(_.sender.toAddress).map { - case (addr, transactions) => - val amt = transactions.map(tx => -(tx.amount + tx.fee)).sum - (addr, amt) - } - portfolios.foreach { - case (addr, balance) => - val pf = utx.pessimisticPortfolio(addr) - pf.balance shouldBe balance - } - } - val gen = for { acc <- accountGen acc1 <- accountGen @@ -102,7 +89,7 @@ class UtxPriorityPoolSpecification case (tx1, nonScripted, scripted) => val blockchain = createState(scripted.head.sender.toAddress) val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) utx.putIfNew(tx1).resultE should beRight val minedTxs = scripted ++ nonScripted utx.setPriorityTxs(minedTxs) @@ -116,7 +103,6 @@ class UtxPriorityPoolSpecification val expectedTxs = minedTxs :+ tx1 utx.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, PackStrategy.Unlimited)._1 shouldBe Some(expectedTxs) utx.all shouldBe expectedTxs - assertPortfolios(utx, expectedTxs) val (left, right) = minedTxs.splitAt(minedTxs.length / 2) @@ -124,31 +110,27 @@ class UtxPriorityPoolSpecification utx.priorityPool.priorityTransactions should not be empty val expectedTxs1 = right :+ tx1 - assertPortfolios(utx, expectedTxs1) all(right.map(utx.putIfNew(_).resultE)) shouldBe Right(false) val test = utx.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, PackStrategy.Unlimited)._1 test shouldBe Some(expectedTxs1) utx.all shouldBe expectedTxs1 - assertPortfolios(utx, expectedTxs1) val expectedTxs2 = expectedTxs1 ++ left.sorted(TransactionsOrdering.InUTXPool(Set())) utx.removeAll(expectedTxs2) left.foreach(utx.putIfNew(_).resultE should beRight) utx.setPriorityTxs(expectedTxs1) utx.all shouldBe expectedTxs2 - assertPortfolios(utx, expectedTxs2) utx.removeAll(expectedTxs2) utx.all shouldBe empty utx.packUnconfirmed(MultiDimensionalMiningConstraint.unlimited, PackStrategy.Unlimited)._1 shouldBe empty - all(expectedTxs2.map(tx => utx.pessimisticPortfolio(tx.sender.toAddress))) shouldBe empty } "removes priority transactions from ordinary pool on pack" in forAll(gen) { case (_, nonScripted, scripted) => val blockchain = createState(scripted.head.sender.toAddress) val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) utx.setPriorityTxs(nonScripted) nonScripted.foreach(utx.putIfNew(_).resultE should beRight) @@ -171,7 +153,7 @@ class UtxPriorityPoolSpecification (blockchain.balance _).when(*, *).returning(0) // Should be overriden in composite blockchain val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) utx.setPriorityTxs(Seq(tx1)) utx.putNewTx(tx2, verify = false, forceValidate = false).resultE should beRight utx.nonPriorityTransactions shouldBe Seq(tx2) @@ -181,7 +163,7 @@ class UtxPriorityPoolSpecification "counts microblock size from priority diffs" in { val blockchain = createState(TxHelpers.defaultSigner.toAddress) val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) def createDiff(): Diff = Monoid.combineAll((1 to 5).map(_ => Diff.empty.bindTransaction(TxHelpers.issue()))) @@ -200,7 +182,7 @@ class UtxPriorityPoolSpecification (blockchain.balance _).when(*, *).returning(0) // All invalid val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) utx.setPriorityTxs(Seq(tx1, tx2)) utx.runCleanup() @@ -214,7 +196,7 @@ class UtxPriorityPoolSpecification (blockchain.balance _).when(*, *).returning(0L) val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) utx.setPriorityTxs(Seq(tx1, tx2)) utx.removeAll(Seq(TxHelpers.issue())) @@ -237,7 +219,7 @@ class UtxPriorityPoolSpecification (blockchain.balance _).when(tx2.sender.toAddress, *).returning(0) val utx = - new UtxPoolImpl(ntpTime, blockchain, ignoreSpendableBalanceChanged, WavesSettings.default().utxSettings) + new UtxPoolImpl(ntpTime, blockchain, WavesSettings.default().utxSettings) utx.setPriorityTxs(Seq(tx1)) utx.putNewTx(tx2, true, false).resultE.explicitGet() utx.nonPriorityTransactions shouldBe Seq(tx2)