diff --git a/.github/workflows/publish-docker-node.yaml b/.github/workflows/publish-docker-node.yaml
index 804c39d8b5c..8fddb878c95 100644
--- a/.github/workflows/publish-docker-node.yaml
+++ b/.github/workflows/publish-docker-node.yaml
@@ -31,7 +31,7 @@ jobs:
- name: Build sources
run: |
- sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt buildTarballsForDocker
+ sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt ;buildTarballsForDocker;buildRIDERunnerForDocker
- name: Setup Docker buildx
uses: docker/setup-buildx-action@v2
diff --git a/build.sbt b/build.sbt
index fbe44e4f3a8..f56a44abf22 100644
--- a/build.sbt
+++ b/build.sbt
@@ -84,10 +84,7 @@ lazy val repl = crossProject(JSPlatform, JVMPlatform)
libraryDependencies ++=
Dependencies.protobuf.value ++
Dependencies.langCompilerPlugins.value ++
- Dependencies.circe.value ++
- Seq(
- "org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0"
- ),
+ Dependencies.circe.value,
inConfig(Compile)(
Seq(
PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value,
@@ -109,6 +106,9 @@ lazy val `repl-jvm` = repl.jvm
)
lazy val `repl-js` = repl.js.dependsOn(`lang-js`)
+ .settings(
+ libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1"
+ )
lazy val `curve25519-test` = project.dependsOn(node)
@@ -198,6 +198,10 @@ buildTarballsForDocker := {
(`grpc-server` / Universal / packageZipTarball).value,
baseDirectory.value / "docker" / "target" / "waves-grpc-server.tgz"
)
+}
+
+lazy val buildRIDERunnerForDocker = taskKey[Unit]("Package RIDE Runner tarball and copy it to docker/target")
+buildRIDERunnerForDocker := {
IO.copyFile(
(`ride-runner` / Universal / packageZipTarball).value,
(`ride-runner` / baseDirectory).value / "docker" / "target" / s"${(`ride-runner` / name).value}.tgz"
@@ -244,7 +248,6 @@ lazy val buildDebPackages = taskKey[Unit]("Build debian packages")
buildDebPackages := {
(`grpc-server` / Debian / packageBin).value
(node / Debian / packageBin).value
- (`ride-runner` / Debian / packageBin).value
}
def buildPackages: Command = Command("buildPackages")(_ => Network.networkParser) { (state, args) =>
diff --git a/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala b/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala
index 166f41d5a47..270a99986ff 100644
--- a/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala
+++ b/grpc-server/src/main/scala/com/wavesplatform/events/BlockchainUpdates.scala
@@ -2,7 +2,6 @@ package com.wavesplatform.events
import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
-import com.wavesplatform.database.RDB
import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc
import com.wavesplatform.events.settings.BlockchainUpdatesSettings
import com.wavesplatform.extensions.{Context, Extension}
@@ -14,6 +13,7 @@ import io.grpc.{Metadata, Server, ServerStreamTracer, Status}
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import net.ceedubs.ficus.Ficus.*
+import org.rocksdb.RocksDB
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
@@ -31,9 +31,8 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
)
private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
- // todo: no need to open column families here
- private[this] val rdb = RDB.open(context.settings.dbSettings.copy(directory = context.settings.directory + "/blockchain-updates"))
- private[this] val repo = new Repo(rdb.db, context.blocksApi)
+ private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates")
+ private[this] val repo = new Repo(rdb, context.blocksApi)
private[this] val grpcServer: Server = NettyServerBuilder
.forAddress(new InetSocketAddress("0.0.0.0", settings.grpcPort))
diff --git a/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala b/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala
index 40f876471ff..5c31009e71a 100644
--- a/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala
+++ b/lang/jvm/src/main/scala/com/wavesplatform/crypto/Curve25519.scala
@@ -21,6 +21,9 @@ object Curve25519 {
def sign(privateKey: Array[Byte], message: Array[Byte]): Array[Byte] =
provider.calculateSignature(provider.getRandom(SignatureLength), privateKey, message)
- def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean = provider.verifySignature(publicKey, message, signature)
-
+ def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean =
+ signature != null && signature.length == SignatureLength &&
+ publicKey != null && publicKey.length == KeyLength &&
+ message != null &&
+ provider.verifySignature(publicKey, message, signature)
}
diff --git a/lang/testkit/src/test/resources/logback-test.xml b/lang/testkit/src/test/resources/logback-test.xml
deleted file mode 100644
index 471a19efaff..00000000000
--- a/lang/testkit/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,14 +0,0 @@
-
-
-
-
- %date %-5level [%.15thread] %logger{26} - %msg%n
-
-
-
-
-
-
-
-
-
diff --git a/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala b/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala
index fc3dbc834c6..92d692a1a7f 100644
--- a/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala
+++ b/lang/testkit/src/test/scala/com/wavesplatform/report/QaseReporter.scala
@@ -1,9 +1,10 @@
package com.wavesplatform.report
import com.wavesplatform.report.QaseReporter.{CaseIdPattern, QaseProjects, TestResult}
-import io.qase.api.QaseClient
+import io.qase.api.config.QaseConfig
import io.qase.api.utils.IntegrationUtils
import io.qase.client.model.ResultCreate
+import org.aeonbits.owner.ConfigFactory
import org.scalatest.Reporter
import org.scalatest.events.*
import play.api.libs.json.{Format, Json}
@@ -45,7 +46,7 @@ class QaseReporter extends Reporter {
msgOpt: Option[String],
duration: Option[Long]
): Unit =
- if (QaseClient.isEnabled) {
+ if (QaseReporter.isEnabled) {
val errMsg = msgOpt.map(msg => s"\n\n**Error**\n$msg").getOrElse("")
val comment = s"$testName$errMsg"
val stacktrace = throwable.map(IntegrationUtils.getStacktrace)
@@ -55,7 +56,7 @@ class QaseReporter extends Reporter {
}
private def saveRunResults(): Unit =
- if (QaseClient.isEnabled) {
+ if (QaseReporter.isEnabled) {
results.asScala.foreach { case (projectCode, results) =>
if (results.nonEmpty) {
val writer = new FileWriter(s"./$projectCode-${System.currentTimeMillis()}")
@@ -73,6 +74,10 @@ class QaseReporter extends Reporter {
}
object QaseReporter {
+ // this hack prevents QaseClient class from being initialized, which in turn initializes Logback with malformed config
+ // and prints a warning about unused appender to stdout
+ private[QaseReporter] val isEnabled = ConfigFactory.create(classOf[QaseConfig]).isEnabled
+
val RunIdKeyPrefix = "QASE_RUN_ID_"
val CheckPRRunIdKey = "CHECKPR_RUN_ID"
val QaseProjects = Seq("NODE", "RIDE", "BU", "SAPI")
diff --git a/lang/tests/src/test/resources/logback-test.xml b/lang/tests/src/test/resources/logback-test.xml
deleted file mode 100644
index b1f394b2e41..00000000000
--- a/lang/tests/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,14 +0,0 @@
-
-
-
-
- %date %-5level [%.15thread] %logger{26} - %msg%n
-
-
-
-
-
-
-
-
-
diff --git a/node-it/build.sbt b/node-it/build.sbt
index d2d0af510ca..7e736881401 100644
--- a/node-it/build.sbt
+++ b/node-it/build.sbt
@@ -11,5 +11,5 @@ inTask(docker)(
)
)
-val packageAll = taskKey[Unit]("build all packages")
-docker := docker.dependsOn(LocalProject("waves-node") / packageAll).value
+val buildTarballsForDocker = taskKey[Unit]("build all packages")
+docker := docker.dependsOn(LocalProject("waves-node") / buildTarballsForDocker).value
diff --git a/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala b/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala
index d66ce80616a..a8932b09bc2 100644
--- a/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala
+++ b/node-it/src/test/scala/com/wavesplatform/test/BlockchainGenerator.scala
@@ -64,10 +64,10 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
private val settings: WavesSettings = wavesSettings.copy(minerSettings = wavesSettings.minerSettings.copy(quorum = 0))
- def generateDb(genBlocks: Seq[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit =
+ def generateDb(genBlocks: Iterator[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit =
generateBlockchain(genBlocks, settings.dbSettings.copy(directory = dbDirPath))
- def generateBinaryFile(genBlocks: Seq[GenBlock]): Unit = {
+ def generateBinaryFile(genBlocks: Iterator[GenBlock]): Unit = {
val targetHeight = genBlocks.size + 1
log.info(s"Exporting to $targetHeight")
val outputFilename = s"blockchain-$targetHeight"
@@ -94,7 +94,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
}
}
- private def generateBlockchain(genBlocks: Seq[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = {
+ private def generateBlockchain(genBlocks: Iterator[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = {
val scheduler = Schedulers.singleThread("appender")
val time = new Time {
val startTime: Long = settings.blockchainSettings.genesisSettings.timestamp
@@ -185,7 +185,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
}
case Left(err) => log.error(s"Error appending block: $err")
}
- }
+ }.get
}
private def correctTxTimestamp(genTx: GenTx, time: Time): Transaction =
diff --git a/node/src/main/resources/application.conf b/node/src/main/resources/application.conf
index f642aa6f81b..8501dbf4b1b 100644
--- a/node/src/main/resources/application.conf
+++ b/node/src/main/resources/application.conf
@@ -23,7 +23,6 @@ waves {
max-cache-size = 100000
max-rollback-depth = 2000
- remember-blocks = 3h
# Delete old history entries (Data, WAVES and Asset balances) in this interval before a safe rollback height.
# Comment to disable.
@@ -40,15 +39,16 @@ waves {
# AA Asset balance history for address.
# cleanup-interval = 500 # Optimal for Xmx2G
- use-bloom-filter = false
-
rocksdb {
main-cache-size = 512M
tx-cache-size = 16M
tx-meta-cache-size = 16M
tx-snapshot-cache-size = 16M
+ api-cache-size=16M
write-buffer-size = 128M
enable-statistics = false
+ # When enabled, after writing every SST file of the default column family, reopen it and read all the keys.
+ paranoid-checks = off
}
}
diff --git a/node/src/main/scala/com/wavesplatform/Explorer.scala b/node/src/main/scala/com/wavesplatform/Explorer.scala
index 96bae8692fc..b51f99e5e97 100644
--- a/node/src/main/scala/com/wavesplatform/Explorer.scala
+++ b/node/src/main/scala/com/wavesplatform/Explorer.scala
@@ -390,6 +390,92 @@ object Explorer extends ScorexLogging {
log.info(s"Load meta for $id")
val meta = rdb.db.get(Keys.transactionMetaById(TransactionId(ByteStr.decodeBase58(id).get), rdb.txMetaHandle))
log.info(s"Meta: $meta")
+ case "DH" =>
+ val address = Address.fromString(argument(1, "address")).explicitGet()
+ val key = argument(2, "key")
+ val requestedHeight = argument(3, "height").toInt
+ log.info(s"Loading address ID for $address")
+ val addressId = rdb.db.get(Keys.addressId(address)).get
+ log.info(s"Collecting data history for key $key on $address ($addressId)")
+ val currentEntry = rdb.db.get(Keys.data(addressId, key))
+ log.info(s"Current entry: $currentEntry")
+ val problematicEntry = rdb.db.get(Keys.dataAt(addressId, key)(requestedHeight))
+ log.info(s"Entry at $requestedHeight: $problematicEntry")
+ case "DHC" =>
+ log.info("Looking for data entry history corruptions")
+ var thisAddressId = 0L
+ var prevHeight = 0
+ var key = ""
+ var addressCount = 0
+ rdb.db.iterateOver(KeyTags.DataHistory.prefixBytes, None) { e =>
+ val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10))
+ val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
+ val keyFromKey = new String(e.getKey.drop(10).dropRight(4), "utf-8")
+ if (addressIdFromKey != thisAddressId) {
+ thisAddressId = addressIdFromKey
+ key = keyFromKey
+ addressCount += 1
+ } else if (key != keyFromKey) {
+ key = keyFromKey
+ } else {
+ val node = readDataNode(key)(e.getValue)
+ if (node.prevHeight != prevHeight) {
+ val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
+ log.warn(s"$address/$key@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")
+
+ }
+ }
+ prevHeight = heightFromKey
+ }
+ log.info(s"Checked $addressCount addresses")
+ case "ABHC" =>
+ log.info("Looking for asset balance history corruptions")
+ var thisAddressId = 0L
+ var prevHeight = 0
+ var key = IssuedAsset(ByteStr(new Array[Byte](32)))
+ var addressCount = 0
+ rdb.db.iterateOver(KeyTags.AssetBalanceHistory.prefixBytes, None) { e =>
+ val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(34, 42))
+ val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
+ val keyFromKey = IssuedAsset(ByteStr(e.getKey.slice(2, 34)))
+ if (keyFromKey != key) {
+ thisAddressId = addressIdFromKey
+ key = keyFromKey
+ addressCount += 1
+ } else if (thisAddressId != addressIdFromKey) {
+ thisAddressId = addressIdFromKey
+ } else {
+ val node = readBalanceNode(e.getValue)
+ if (node.prevHeight != prevHeight) {
+ val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
+ log.warn(s"$key/$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")
+
+ }
+ }
+ prevHeight = heightFromKey
+ }
+ log.info(s"Checked $addressCount assets")
+ case "BHC" =>
+ log.info("Looking for balance history corruptions")
+ var thisAddressId = 0L
+ var prevHeight = 0
+ var addressCount = 0
+ rdb.db.iterateOver(KeyTags.WavesBalanceHistory.prefixBytes, None) { e =>
+ val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10))
+ val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
+ if (addressIdFromKey != thisAddressId) {
+ thisAddressId = addressIdFromKey
+ addressCount += 1
+ } else {
+ val node = readBalanceNode(e.getValue)
+ if (node.prevHeight != prevHeight) {
+ val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
+ log.warn(s"$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")
+ }
+ }
+ prevHeight = heightFromKey
+ }
+ log.info(s"Checked $addressCount addresses")
}
} finally {
reader.close()
diff --git a/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala b/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala
index f41d5874a4b..51444fddf50 100644
--- a/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala
+++ b/node/src/main/scala/com/wavesplatform/GenesisBlockGenerator.scala
@@ -96,7 +96,7 @@ object GenesisBlockGenerator {
.headOption
.map(new File(_).getAbsoluteFile.ensuring(f => !f.isDirectory && f.getParentFile.isDirectory || f.getParentFile.mkdirs()))
- val settings = parseSettings(ConfigFactory.parseFile(inputConfFile))
+ val settings = parseSettings(ConfigFactory.parseFile(inputConfFile).resolve())
val confBody = createConfig(settings)
outputConfFile.foreach(ocf => Files.write(ocf.toPath, confBody.utf8Bytes))
}
diff --git a/node/src/main/scala/com/wavesplatform/Importer.scala b/node/src/main/scala/com/wavesplatform/Importer.scala
index 7f20c716e23..2b959f8030e 100644
--- a/node/src/main/scala/com/wavesplatform/Importer.scala
+++ b/node/src/main/scala/com/wavesplatform/Importer.scala
@@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import cats.implicits.catsSyntaxOption
import cats.syntax.apply.*
import com.google.common.io.ByteStreams
-import com.google.common.primitives.Ints
+import com.google.common.primitives.{Ints, Longs}
import com.wavesplatform.Exporter.Formats
import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonBlocksApi, CommonTransactionsApi}
import com.wavesplatform.block.{Block, BlockHeader}
@@ -217,6 +217,8 @@ object Importer extends ScorexLogging {
val maxSize = importOptions.maxQueueSize
val queue = new mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])](maxSize)
+ val CurrentTS = System.currentTimeMillis()
+
@tailrec
def readBlocks(queue: mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])], remainCount: Int, maxCount: Int): Unit = {
if (remainCount == 0) ()
@@ -247,11 +249,12 @@ object Importer extends ScorexLogging {
if (blocksToSkip > 0) {
blocksToSkip -= 1
} else {
- val blockV5 = blockchain.isFeatureActivated(BlockchainFeatures.BlockV5, blockchain.height + (maxCount - remainCount) + 1)
val rideV6 = blockchain.isFeatureActivated(BlockchainFeatures.RideV6, blockchain.height + (maxCount - remainCount) + 1)
lazy val parsedProtoBlock = PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)
-
- val block = (if (!blockV5) Block.parseBytes(blockBytes) else parsedProtoBlock).orElse(parsedProtoBlock).get
+ val block = (if (1 < blockBytes.head && blockBytes.head < 5 && Longs.fromByteArray(blockBytes.slice(1, 9)) < CurrentTS)
+ Block.parseBytes(blockBytes).orElse(parsedProtoBlock)
+ else
+ parsedProtoBlock).get
val blockSnapshot = snapshotsBytes.map { bytes =>
BlockSnapshotResponse(
block.id(),
@@ -308,7 +311,7 @@ object Importer extends ScorexLogging {
case _ =>
counter = counter + 1
}
- } else {
+ } else if (!quit){
log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}")
}
}
@@ -360,7 +363,7 @@ object Importer extends ScorexLogging {
val blocksFileOffset =
importOptions.format match {
case Formats.Binary =>
- var blocksOffset = 0
+ var blocksOffset = 0L
rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e =>
e.getKey match {
case Array(_, _, 0, 0, 0, 1) => // Skip genesis
diff --git a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala
index 4fe340eac38..bfbc56bd260 100644
--- a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala
+++ b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala
@@ -33,28 +33,33 @@ object AddressTransactions {
}
.toSeq
- private def loadInvokeScriptResult(resource: DBResource, txMetaHandle: RDB.TxMetaHandle, txId: ByteStr): Option[InvokeScriptResult] =
+ private def loadInvokeScriptResult(
+ resource: DBResource,
+ txMetaHandle: RDB.TxMetaHandle,
+ apiHandle: RDB.ApiHandle,
+ txId: ByteStr
+ ): Option[InvokeScriptResult] =
for {
tm <- resource.get(Keys.transactionMetaById(TransactionId(txId), txMetaHandle))
- scriptResult <- resource.get(Keys.invokeScriptResult(tm.height, TxNum(tm.num.toShort)))
+ scriptResult <- resource.get(Keys.invokeScriptResult(tm.height, TxNum(tm.num.toShort), apiHandle))
} yield scriptResult
- def loadInvokeScriptResult(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, txId: ByteStr): Option[InvokeScriptResult] =
- db.withResource(r => loadInvokeScriptResult(r, txMetaHandle, txId))
+ def loadInvokeScriptResult(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, apiHandle: RDB.ApiHandle, txId: ByteStr): Option[InvokeScriptResult] =
+ db.withResource(r => loadInvokeScriptResult(r, txMetaHandle, apiHandle, txId))
- def loadInvokeScriptResult(db: RocksDB, height: Height, txNum: TxNum): Option[InvokeScriptResult] =
- db.get(Keys.invokeScriptResult(height, txNum))
+ def loadInvokeScriptResult(db: RocksDB, apiHandle: RDB.ApiHandle, height: Height, txNum: TxNum): Option[InvokeScriptResult] =
+ db.get(Keys.invokeScriptResult(height, txNum, apiHandle))
- def loadEthereumMetadata(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, txId: ByteStr): Option[EthereumTransactionMeta] = db.withResource {
- resource =>
+ def loadEthereumMetadata(db: RocksDB, txMetaHandle: RDB.TxMetaHandle, apiHandle: RDB.ApiHandle, txId: ByteStr): Option[EthereumTransactionMeta] =
+ db.withResource { resource =>
for {
tm <- resource.get(Keys.transactionMetaById(TransactionId(txId), txMetaHandle))
- m <- resource.get(Keys.ethereumTransactionMeta(Height(tm.height), TxNum(tm.num.toShort)))
+ m <- resource.get(Keys.ethereumTransactionMeta(Height(tm.height), TxNum(tm.num.toShort), apiHandle))
} yield m
- }
+ }
- def loadEthereumMetadata(db: RocksDB, height: Height, txNum: TxNum): Option[EthereumTransactionMeta] =
- db.get(Keys.ethereumTransactionMeta(height, txNum))
+ def loadEthereumMetadata(db: RocksDB, apiHandle: RDB.ApiHandle, height: Height, txNum: TxNum): Option[EthereumTransactionMeta] =
+ db.get(Keys.ethereumTransactionMeta(height, txNum, apiHandle))
def allAddressTransactions(
rdb: RDB,
@@ -82,24 +87,25 @@ object AddressTransactions {
sender: Option[Address],
types: Set[Transaction.Type],
fromId: Option[ByteStr]
- ): Observable[(TxMeta, Transaction, Option[TxNum])] = rdb.db.resourceObservable.flatMap { dbResource =>
- dbResource
- .get(Keys.addressId(subject))
- .fold(Observable.empty[(TxMeta, Transaction, Option[TxNum])]) { addressId =>
- val (maxHeight, maxTxNum) =
- fromId
- .flatMap(id => rdb.db.get(Keys.transactionMetaById(TransactionId(id), rdb.txMetaHandle)))
- .fold[(Height, TxNum)](Height(Int.MaxValue) -> TxNum(Short.MaxValue)) { tm =>
- Height(tm.height) -> TxNum(tm.num.toShort)
- }
+ ): Observable[(TxMeta, Transaction, Option[TxNum])] =
+ rdb.db.resourceObservable(rdb.apiHandle.handle).flatMap { dbResource =>
+ dbResource
+ .get(Keys.addressId(subject))
+ .fold(Observable.empty[(TxMeta, Transaction, Option[TxNum])]) { addressId =>
+ val (maxHeight, maxTxNum) =
+ fromId
+ .flatMap(id => rdb.db.get(Keys.transactionMetaById(TransactionId(id), rdb.txMetaHandle)))
+ .fold[(Height, TxNum)](Height(Int.MaxValue) -> TxNum(Short.MaxValue)) { tm =>
+ Height(tm.height) -> TxNum(tm.num.toShort)
+ }
- Observable
- .fromIterator(
- Task(new TxByAddressIterator(dbResource, rdb.txHandle, addressId, maxHeight, maxTxNum, sender, types).asScala)
- )
- .concatMapIterable(identity)
- }
- }
+ Observable
+ .fromIterator(
+ Task(new TxByAddressIterator(dbResource, rdb.txHandle, rdb.apiHandle, addressId, maxHeight, maxTxNum, sender, types).asScala)
+ )
+ .concatMapIterable(identity)
+ }
+ }
private def transactionsFromSnapshot(
maybeSnapshot: Option[(Height, StateSnapshot)],
@@ -121,14 +127,15 @@ object AddressTransactions {
private class TxByAddressIterator(
db: DBResource,
txHandle: RDB.TxHandle,
+ apiHandle: RDB.ApiHandle,
addressId: AddressId,
maxHeight: Int,
maxTxNum: Int,
sender: Option[Address],
types: Set[Transaction.Type]
) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] {
- private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId))
- db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr).keyBytes))()
+ private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId, apiHandle))
+ db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))()
final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator =>
val keysBuffer = new ArrayBuffer[Key[Option[(TxMeta, Transaction)]]]()
diff --git a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala
index 812e38d4304..4faaf1b6c13 100644
--- a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala
+++ b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala
@@ -94,7 +94,7 @@ object CommonAccountsApi {
}
override def nftList(address: Address, after: Option[IssuedAsset]): Observable[Seq[(IssuedAsset, AssetDescription)]] = {
- rdb.db.resourceObservable.flatMap { resource =>
+ rdb.db.resourceObservable(rdb.apiHandle.handle).flatMap { resource =>
Observable
.fromIterator(Task(nftIterator(resource, address, compositeBlockchain().snapshot, after, blockchain.assetDescription)))
}
diff --git a/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala b/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala
index 9bb9d621846..85b6e9cd9e1 100644
--- a/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala
+++ b/node/src/main/scala/com/wavesplatform/api/common/lease/AddressLeaseInfo.scala
@@ -39,15 +39,15 @@ object AddressLeaseInfo {
private def leasesFromDb(rdb: RDB, subject: Address): Observable[LeaseInfo] =
for {
- dbResource <- rdb.db.resourceObservable
+ dbResource <- rdb.db.resourceObservable(rdb.apiHandle.handle)
(leaseId, details) <- dbResource
.get(Keys.addressId(subject))
- .map(fromLeaseDbIterator(dbResource, _))
+ .map(fromLeaseDbIterator(dbResource, rdb.apiHandle, _))
.getOrElse(Observable.empty)
} yield LeaseInfo.fromLeaseDetails(leaseId, details)
- private def fromLeaseDbIterator(dbResource: DBResource, addressId: AddressId): Observable[(ByteStr, LeaseDetails)] =
+ private def fromLeaseDbIterator(dbResource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId): Observable[(ByteStr, LeaseDetails)] =
Observable
- .fromIterator(Task(new LeaseByAddressIterator(dbResource, addressId).asScala))
+ .fromIterator(Task(new LeaseByAddressIterator(dbResource, apiHandle, addressId).asScala))
.concatMapIterable(identity)
}
diff --git a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala
index 6f18da0c217..f9821a7d214 100644
--- a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala
+++ b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala
@@ -3,18 +3,19 @@ package com.wavesplatform.api.common.lease
import com.google.common.collect.AbstractIterator
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database
-import com.wavesplatform.database.{AddressId, DBResource, Keys}
+import com.wavesplatform.database.{AddressId, DBResource, Keys, RDB}
import com.wavesplatform.state.LeaseDetails
-private class LeaseByAddressIterator(resource: DBResource, addressId: AddressId) extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] {
- private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId))
- resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr).keyBytes))()
+private class LeaseByAddressIterator(resource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId)
+ extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] {
+ private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId, apiHandle))
+ resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))()
final override def computeNext(): Seq[(ByteStr, LeaseDetails)] =
resource.withSafePrefixIterator { iterator =>
if (iterator.isValid) {
val details = for {
- id <- database.readLeaseIdSeq(iterator.value())
+ id <- database.readLeaseIdSeq(iterator.value())
details <- database.loadLease(resource, id) if details.isActive
} yield (id, details)
iterator.prev()
diff --git a/node/src/main/scala/com/wavesplatform/api/common/package.scala b/node/src/main/scala/com/wavesplatform/api/common/package.scala
index 6b2e1e20437..a1d0dc60478 100644
--- a/node/src/main/scala/com/wavesplatform/api/common/package.scala
+++ b/node/src/main/scala/com/wavesplatform/api/common/package.scala
@@ -28,12 +28,12 @@ package object common {
def loadISR(t: Transaction) =
maybeDiff
.flatMap { case (_, diff) => diff.scriptResults.get(t.id()) }
- .orElse(txNumOpt.flatMap(loadInvokeScriptResult(rdb.db, m.height, _)))
+ .orElse(txNumOpt.flatMap(loadInvokeScriptResult(rdb.db, rdb.apiHandle, m.height, _)))
def loadETM(t: Transaction) =
maybeDiff
.flatMap { case (_, diff) => diff.ethereumTransactionMeta.get(t.id()) }
- .orElse(txNumOpt.flatMap(loadEthereumMetadata(rdb.db, m.height, _)))
+ .orElse(txNumOpt.flatMap(loadEthereumMetadata(rdb.db, rdb.apiHandle, m.height, _)))
TransactionMeta.create(
m.height,
@@ -90,11 +90,11 @@ package object common {
ist =>
maybeSnapshot
.flatMap { case (_, s) => s.scriptResults.get(ist.id()) }
- .orElse(loadInvokeScriptResult(rdb.db, rdb.txMetaHandle, ist.id())),
+ .orElse(loadInvokeScriptResult(rdb.db, rdb.txMetaHandle, rdb.apiHandle, ist.id())),
et =>
maybeSnapshot
.flatMap { case (_, s) => s.ethereumTransactionMeta.get(et.id()) }
- .orElse(loadEthereumMetadata(rdb.db, rdb.txMetaHandle, et.id()))
+ .orElse(loadEthereumMetadata(rdb.db, rdb.txMetaHandle, rdb.apiHandle, et.id()))
)
}
}
diff --git a/node/src/main/scala/com/wavesplatform/database/Caches.scala b/node/src/main/scala/com/wavesplatform/database/Caches.scala
index 38a51d6f37d..5129ce7c6b2 100644
--- a/node/src/main/scala/com/wavesplatform/database/Caches.scala
+++ b/node/src/main/scala/com/wavesplatform/database/Caches.scala
@@ -128,7 +128,7 @@ abstract class Caches extends Blockchain with Storage {
VolumeAndFee(curVf.volume, curVf.fee)
}
- private val memMeter = MemoryMeter.builder().build()
+ protected val memMeter = MemoryMeter.builder().build()
private val scriptCache: LoadingCache[Address, Option[AccountScriptInfo]] =
CacheBuilder
@@ -183,7 +183,7 @@ abstract class Caches extends Blockchain with Storage {
protected def discardAccountData(addressWithKey: (Address, String)): Unit = accountDataCache.invalidate(addressWithKey)
protected def loadAccountData(acc: Address, key: String): CurrentData
- protected def loadEntryHeights(keys: Iterable[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height]
+ protected def loadEntryHeights(keys: Seq[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height]
private[database] def addressId(address: Address): Option[AddressId] = addressIdCache.get(address)
private[database] def addressIds(addresses: Seq[Address]): Map[Address, Option[AddressId]] =
@@ -299,15 +299,15 @@ abstract class Caches extends Blockchain with Storage {
(key, entry) <- entries
} yield ((address, key), entry)
- val cachedEntries = accountDataCache.getAllPresent(newEntries.keys.asJava).asScala
- val loadedPrevEntries = loadEntryHeights(newEntries.keys.filterNot(cachedEntries.contains), addressIdWithFallback(_, newAddressIds))
+ val cachedEntries = accountDataCache.getAllPresent(newEntries.keys.asJava).asScala
+ val loadedPrevEntryHeights = loadEntryHeights(newEntries.keys.filterNot(cachedEntries.contains).toSeq, addressIdWithFallback(_, newAddressIds))
val updatedDataWithNodes = (for {
- (k, currentEntry) <- cachedEntries.view.mapValues(_.height) ++ loadedPrevEntries
- newEntry <- newEntries.get(k)
+ (k, heightOfPreviousEntry) <- cachedEntries.view.mapValues(_.height) ++ loadedPrevEntryHeights
+ newEntry <- newEntries.get(k)
} yield k -> (
- CurrentData(newEntry, Height(height), currentEntry),
- DataNode(newEntry, currentEntry)
+ CurrentData(newEntry, Height(height), heightOfPreviousEntry),
+ DataNode(newEntry, heightOfPreviousEntry)
)).toMap
val orderFillsWithNodes = for {
diff --git a/node/src/main/scala/com/wavesplatform/database/DBResource.scala b/node/src/main/scala/com/wavesplatform/database/DBResource.scala
index 5217cbbcffb..7d3f17515b8 100644
--- a/node/src/main/scala/com/wavesplatform/database/DBResource.scala
+++ b/node/src/main/scala/com/wavesplatform/database/DBResource.scala
@@ -1,6 +1,6 @@
package com.wavesplatform.database
-import org.rocksdb.{ReadOptions, RocksDB, RocksIterator}
+import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator}
import scala.collection.View
import scala.collection.mutable.ArrayBuffer
@@ -18,9 +18,9 @@ trait DBResource extends AutoCloseable {
}
object DBResource {
- def apply(db: RocksDB): DBResource = new DBResource {
+ def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource {
private[this] val snapshot = db.getSnapshot
- private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false)
+ private[this] val readOptions = new ReadOptions().setSnapshot(snapshot)
override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes))
@@ -35,9 +35,11 @@ object DBResource {
def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] =
db.multiGet(readOptions, keys, valBufferSize)
- override lazy val prefixIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true))
+ override lazy val prefixIterator: RocksIterator =
+ db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true))
- override lazy val fullIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(true))
+ override lazy val fullIterator: RocksIterator =
+ db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true))
override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized {
if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed
diff --git a/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala b/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala
index 74bd3fe903f..3176c591661 100644
--- a/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala
+++ b/node/src/main/scala/com/wavesplatform/database/KeyHelpers.scala
@@ -3,6 +3,7 @@ package com.wavesplatform.database
import com.google.common.primitives.{Bytes, Ints, Longs, Shorts}
import com.wavesplatform.state
import com.wavesplatform.state.{Height, TxNum}
+import org.rocksdb.ColumnFamilyHandle
import java.nio.ByteBuffer
@@ -32,8 +33,8 @@ object KeyHelpers {
Ints.toByteArray
)
- def bytesSeqNr(keyTag: KeyTags.KeyTag, suffix: Array[Byte], default: Int = 0): Key[Int] =
- Key(keyTag, suffix, v => if (v != null && v.length >= Ints.BYTES) Ints.fromByteArray(v) else default, Ints.toByteArray)
+ def bytesSeqNr(keyTag: KeyTags.KeyTag, suffix: Array[Byte], default: Int = 0, cfh: Option[ColumnFamilyHandle] = None): Key[Int] =
+ Key(keyTag, suffix, v => if (v != null && v.length >= Ints.BYTES) Ints.fromByteArray(v) else default, Ints.toByteArray, cfh)
def unsupported[A](message: String): A => Array[Byte] = _ => throw new UnsupportedOperationException(message)
}
diff --git a/node/src/main/scala/com/wavesplatform/database/Keys.scala b/node/src/main/scala/com/wavesplatform/database/Keys.scala
index 064366ace27..0323a236dac 100644
--- a/node/src/main/scala/com/wavesplatform/database/Keys.scala
+++ b/node/src/main/scala/com/wavesplatform/database/Keys.scala
@@ -167,7 +167,7 @@ object Keys {
Some(cfHandle.handle)
)
- def transactionStateSnapshotAt(height: Height, n: TxNum, cfHandle: RDB.TxHandle): Key[Option[TransactionStateSnapshot]] =
+ def transactionStateSnapshotAt(height: Height, n: TxNum, cfHandle: RDB.TxSnapshotHandle): Key[Option[TransactionStateSnapshot]] =
Key.opt[TransactionStateSnapshot](
NthTransactionStateSnapshotAtHeight,
hNum(height, n),
@@ -176,26 +176,28 @@ object Keys {
Some(cfHandle.handle)
)
- def addressTransactionSeqNr(addressId: AddressId): Key[Int] =
- bytesSeqNr(AddressTransactionSeqNr, addressId.toByteArray)
+ def addressTransactionSeqNr(addressId: AddressId, cfh: RDB.ApiHandle): Key[Int] =
+ bytesSeqNr(AddressTransactionSeqNr, addressId.toByteArray, cfh = Some(cfh.handle))
- def addressTransactionHN(addressId: AddressId, seqNr: Int): Key[Option[(Height, Seq[(Byte, TxNum, Int)])]] =
+ def addressTransactionHN(addressId: AddressId, seqNr: Int, cfh: RDB.ApiHandle): Key[Option[(Height, Seq[(Byte, TxNum, Int)])]] =
Key.opt(
AddressTransactionHeightTypeAndNums,
hBytes(addressId.toByteArray, seqNr),
readTransactionHNSeqAndType,
- writeTransactionHNSeqAndType
+ writeTransactionHNSeqAndType,
+ Some(cfh.handle)
)
- def addressLeaseSeqNr(addressId: AddressId): Key[Int] =
- bytesSeqNr(AddressLeaseInfoSeqNr, addressId.toByteArray)
+ def addressLeaseSeqNr(addressId: AddressId, cfh: RDB.ApiHandle): Key[Int] =
+ bytesSeqNr(AddressLeaseInfoSeqNr, addressId.toByteArray, cfh = Some(cfh.handle))
- def addressLeaseSeq(addressId: AddressId, seqNr: Int): Key[Option[Seq[ByteStr]]] =
+ def addressLeaseSeq(addressId: AddressId, seqNr: Int, cfh: RDB.ApiHandle): Key[Option[Seq[ByteStr]]] =
Key.opt(
AddressLeaseInfoSeq,
hBytes(addressId.toByteArray, seqNr),
readLeaseIdSeq,
- writeLeaseIdSeq
+ writeLeaseIdSeq,
+ Some(cfh.handle)
)
def transactionMetaById(txId: TransactionId, cfh: RDB.TxMetaHandle): Key[Option[TransactionMeta]] =
@@ -207,8 +209,8 @@ object Keys {
Some(cfh.handle)
)
- def invokeScriptResult(height: Int, txNum: TxNum): Key[Option[InvokeScriptResult]] =
- Key.opt(InvokeScriptResultTag, hNum(height, txNum), InvokeScriptResult.fromBytes, InvokeScriptResult.toBytes)
+ def invokeScriptResult(height: Int, txNum: TxNum, cfh: RDB.ApiHandle): Key[Option[InvokeScriptResult]] =
+ Key.opt(InvokeScriptResultTag, hNum(height, txNum), InvokeScriptResult.fromBytes, InvokeScriptResult.toBytes, Some(cfh.handle))
val disabledAliases: Key[Set[Alias]] = Key(
DisabledAliases,
@@ -223,11 +225,11 @@ object Keys {
def assetStaticInfo(addr: ERC20Address): Key[Option[StaticAssetInfo]] =
Key.opt(AssetStaticInfo, addr.arr, StaticAssetInfo.parseFrom, _.toByteArray)
- def nftCount(addressId: AddressId): Key[Int] =
- Key(NftCount, addressId.toByteArray, Option(_).fold(0)(Ints.fromByteArray), Ints.toByteArray)
+ def nftCount(addressId: AddressId, cfh: RDB.ApiHandle): Key[Int] =
+ Key(NftCount, addressId.toByteArray, Option(_).fold(0)(Ints.fromByteArray), Ints.toByteArray, Some(cfh.handle))
- def nftAt(addressId: AddressId, index: Int, assetId: IssuedAsset): Key[Option[Unit]] =
- Key.opt(NftPossession, addressId.toByteArray ++ Longs.toByteArray(index) ++ assetId.id.arr, _ => (), _ => Array.emptyByteArray)
+ def nftAt(addressId: AddressId, index: Int, assetId: IssuedAsset, cfh: RDB.ApiHandle): Key[Option[Unit]] =
+ Key.opt(NftPossession, addressId.toByteArray ++ Longs.toByteArray(index) ++ assetId.id.arr, _ => (), _ => Array.emptyByteArray, Some(cfh.handle))
def stateHash(height: Int): Key[Option[StateHash]] =
Key.opt(StateHash, h(height), readStateHash, writeStateHash)
@@ -235,8 +237,8 @@ object Keys {
def blockStateHash(height: Int): Key[ByteStr] =
Key(BlockStateHash, h(height), Option(_).fold(TxStateSnapshotHashBuilder.InitStateHash)(ByteStr(_)), _.arr)
- def ethereumTransactionMeta(height: Height, txNum: TxNum): Key[Option[EthereumTransactionMeta]] =
- Key.opt(EthereumTransactionMetaTag, hNum(height, txNum), EthereumTransactionMeta.parseFrom, _.toByteArray)
+ def ethereumTransactionMeta(height: Height, txNum: TxNum, cfh: RDB.ApiHandle): Key[Option[EthereumTransactionMeta]] =
+ Key.opt(EthereumTransactionMetaTag, hNum(height, txNum), EthereumTransactionMeta.parseFrom, _.toByteArray, Some(cfh.handle))
def maliciousMinerBanHeights(addressBytes: Array[Byte]): Key[Seq[Int]] =
historyKey(MaliciousMinerBanHeights, addressBytes)
diff --git a/node/src/main/scala/com/wavesplatform/database/RDB.scala b/node/src/main/scala/com/wavesplatform/database/RDB.scala
index 00a3e70077e..7c9bb13a791 100644
--- a/node/src/main/scala/com/wavesplatform/database/RDB.scala
+++ b/node/src/main/scala/com/wavesplatform/database/RDB.scala
@@ -1,7 +1,7 @@
package com.wavesplatform.database
import com.typesafe.scalalogging.StrictLogging
-import com.wavesplatform.database.RDB.{TxHandle, TxMetaHandle}
+import com.wavesplatform.database.RDB.{ApiHandle, TxHandle, TxMetaHandle, TxSnapshotHandle}
import com.wavesplatform.settings.DBSettings
import com.wavesplatform.utils.*
import org.rocksdb.*
@@ -16,7 +16,8 @@ final class RDB(
val db: RocksDB,
val txMetaHandle: TxMetaHandle,
val txHandle: TxHandle,
- val txSnapshotHandle: TxHandle,
+ val txSnapshotHandle: TxSnapshotHandle,
+ val apiHandle: ApiHandle,
acquiredResources: Seq[RocksObject]
) extends AutoCloseable {
override def close(): Unit = {
@@ -28,6 +29,9 @@ final class RDB(
object RDB extends StrictLogging {
final class TxMetaHandle private[RDB] (val handle: ColumnFamilyHandle)
final class TxHandle private[RDB] (val handle: ColumnFamilyHandle)
+ final class TxSnapshotHandle private[RDB] (val handle: ColumnFamilyHandle)
+ final class ApiHandle private[RDB] (val handle: ColumnFamilyHandle)
+
case class OptionsWithResources[A](options: A, resources: Seq[RocksObject])
def open(settings: DBSettings): RDB = {
@@ -36,18 +40,15 @@ object RDB extends StrictLogging {
logger.debug(s"Open DB at ${settings.directory}")
val dbOptions = createDbOptions(settings)
-
- val dbDir = file.getAbsoluteFile
+ val dbDir = file.getAbsoluteFile
dbDir.getParentFile.mkdirs()
- val handles = new util.ArrayList[ColumnFamilyHandle]()
- val defaultCfOptions = newColumnFamilyOptions(12.0, 16 << 10, settings.rocksdb.mainCacheSize, 0.6, settings.rocksdb.writeBufferSize)
- val defaultCfCompressionForLevels = CompressionType.NO_COMPRESSION :: // Disable compaction for L0, because it is predictable and small
- List.fill(defaultCfOptions.options.numLevels() - 1)(CompressionType.LZ4_COMPRESSION)
-
+ val handles = new util.ArrayList[ColumnFamilyHandle]()
+ val defaultCfOptions = newColumnFamilyOptions(12.0, 16 << 10, settings.rocksdb.mainCacheSize, 0.6, settings.rocksdb.writeBufferSize)
val txMetaCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.txMetaCacheSize, 0.9, settings.rocksdb.writeBufferSize)
val txCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.txCacheSize, 0.9, settings.rocksdb.writeBufferSize)
val txSnapshotCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.txSnapshotCacheSize, 0.9, settings.rocksdb.writeBufferSize)
+ val apiCfOptions = newColumnFamilyOptions(10.0, 2 << 10, settings.rocksdb.apiCacheSize, 0.9, settings.rocksdb.writeBufferSize)
val db = RocksDB.open(
dbOptions.options,
settings.directory,
@@ -56,7 +57,6 @@ object RDB extends StrictLogging {
RocksDB.DEFAULT_COLUMN_FAMILY,
defaultCfOptions.options
.setMaxWriteBufferNumber(3)
- .setCompressionPerLevel(defaultCfCompressionForLevels.asJava)
.setCfPaths(Seq(new DbPath(new File(dbDir, "default").toPath, 0L)).asJava)
),
new ColumnFamilyDescriptor(
@@ -75,6 +75,11 @@ object RDB extends StrictLogging {
"tx-snapshot".utf8Bytes,
txSnapshotCfOptions.options
.setCfPaths(Seq(new DbPath(new File(dbDir, "tx-snapshot").toPath, 0L)).asJava)
+ ),
+ new ColumnFamilyDescriptor(
+ "api".utf8Bytes,
+ apiCfOptions.options
+ .setCfPaths(Seq(new DbPath(new File(dbDir, "api").toPath, 0L)).asJava)
)
).asJava,
handles
@@ -84,7 +89,8 @@ object RDB extends StrictLogging {
db,
new TxMetaHandle(handles.get(1)),
new TxHandle(handles.get(2)),
- new TxHandle(handles.get(3)),
+ new TxSnapshotHandle(handles.get(3)),
+ new ApiHandle(handles.get(4)),
dbOptions.resources ++ defaultCfOptions.resources ++ txMetaCfOptions.resources ++ txCfOptions.resources ++ txSnapshotCfOptions.resources
)
}
@@ -109,7 +115,6 @@ object RDB extends StrictLogging {
.setPinL0FilterAndIndexBlocksInCache(true)
.setFormatVersion(5)
.setBlockSize(blockSize)
- .setChecksumType(ChecksumType.kNoChecksum)
.setBlockCache(blockCache)
.setCacheIndexAndFilterBlocksWithHighPriority(true)
.setDataBlockIndexType(DataBlockIndexType.kDataBlockBinaryAndHash)
@@ -132,12 +137,13 @@ object RDB extends StrictLogging {
private def createDbOptions(settings: DBSettings): OptionsWithResources[DBOptions] = {
val dbOptions = new DBOptions()
.setCreateIfMissing(true)
- .setParanoidChecks(true)
+ .setParanoidChecks(settings.rocksdb.paranoidChecks)
.setIncreaseParallelism(6)
.setBytesPerSync(2 << 20)
.setCreateMissingColumnFamilies(true)
.setMaxOpenFiles(100)
.setMaxSubcompactions(2) // Write stalls expected without this option. Can lead to max_background_jobs * max_subcompactions background threads
+ .setMaxManifestFileSize(200 << 20)
if (settings.rocksdb.enableStatistics) {
val statistics = new Statistics()
diff --git a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala
index aab88daa932..c2f8361fceb 100644
--- a/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala
+++ b/node/src/main/scala/com/wavesplatform/database/RocksDBWriter.scala
@@ -38,6 +38,7 @@ import org.slf4j.LoggerFactory
import sun.nio.ch.Util
import java.nio.ByteBuffer
+import java.time.Duration
import java.util
import java.util.concurrent.*
import scala.annotation.tailrec
@@ -115,14 +116,12 @@ object RocksDBWriter extends ScorexLogging {
settings: BlockchainSettings,
dbSettings: DBSettings,
isLightMode: Boolean,
- bfBlockInsertions: Int = 10000,
forceCleanupExecutorService: Option[ExecutorService] = None
): RocksDBWriter = new RocksDBWriter(
rdb,
settings,
dbSettings,
isLightMode,
- bfBlockInsertions,
dbSettings.cleanupInterval match {
case None => MoreExecutors.newDirectExecutorService() // We don't care if disabled
case Some(_) =>
@@ -147,7 +146,6 @@ class RocksDBWriter(
val settings: BlockchainSettings,
val dbSettings: DBSettings,
isLightMode: Boolean,
- bfBlockInsertions: Int = 10000,
cleanupExecutorService: ExecutorService
) extends Caches
with AutoCloseable {
@@ -215,20 +213,18 @@ class RocksDBWriter(
writableDB.get(Keys.data(addressId, key))
}
- override protected def loadEntryHeights(keys: Iterable[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height] = {
- val keyBufs = database.getKeyBuffersFromKeys(keys.map { case (addr, k) => Keys.data(addressIdOf(addr), k) }.toVector)
- val valBufs = database.getValueBuffers(keys.size, 8)
- val valueBuf = new Array[Byte](8)
+ override protected def loadEntryHeights(keys: Seq[(Address, String)], addressIdOf: Address => AddressId): Map[(Address, String), Height] = {
+ val keyBufs = database.getKeyBuffersFromKeys(keys.view.map { case (addr, k) => Keys.data(addressIdOf(addr), k) }.toVector)
+ val valBufs = database.getValueBuffers(keys.size, 4)
val result = rdb.db
.multiGetByteBuffers(keyBufs.asJava, valBufs.asJava)
.asScala
.view
.zip(keys)
- .map { case (status, k @ (_, key)) =>
+ .map { case (status, k) =>
if (status.status.getCode == Status.Code.Ok) {
- status.value.get(valueBuf)
- k -> readCurrentData(key)(valueBuf).height
+ k -> Height(status.value.getInt)
} else k -> Height(0)
}
.toMap
@@ -399,11 +395,11 @@ class RocksDBWriter(
}
for ((addressId, nftIds) <- updatedNftLists.asMap().asScala) {
- val kCount = Keys.nftCount(AddressId(addressId.toLong))
+ val kCount = Keys.nftCount(AddressId(addressId.toLong), rdb.apiHandle)
val previousNftCount = rw.get(kCount)
rw.put(kCount, previousNftCount + nftIds.size())
for ((id, idx) <- nftIds.asScala.zipWithIndex) {
- rw.put(Keys.nftAt(AddressId(addressId.toLong), previousNftCount + idx, id), Some(()))
+ rw.put(Keys.nftAt(AddressId(addressId.toLong), previousNftCount + idx, id, rdb.apiHandle), Some(()))
}
}
@@ -430,32 +426,52 @@ class RocksDBWriter(
}
}
- // todo: instead of fixed-size block batches, store fixed-time batches
- private val BlockStep = 200
- private def mkFilter() = BloomFilter.create[Array[Byte]](Funnels.byteArrayFunnel(), BlockStep * bfBlockInsertions, 0.01f)
- private def initFilters(): (BloomFilter[Array[Byte]], BloomFilter[Array[Byte]]) = {
- def loadFilter(heights: Seq[Int]): BloomFilter[Array[Byte]] = {
- val filter = mkFilter()
- heights.filter(_ > 0).foreach { h =>
- loadTransactions(Height(h), rdb).foreach { case (_, tx) => filter.put(tx.id().arr) }
+ private var TxFilterResetTs = lastBlock.fold(0L)(_.header.timestamp)
+ private def mkFilter() = BloomFilter.create[Array[Byte]](Funnels.byteArrayFunnel(), 1_000_000, 0.001f)
+ private var currentTxFilter = mkFilter()
+ private var prevTxFilter = lastBlock match {
+ case Some(b) =>
+ TxFilterResetTs = b.header.timestamp
+ val prevFilter = mkFilter()
+
+ var fromHeight = height
+ Using(writableDB.newIterator()) { iter =>
+ iter.seek(Keys.blockMetaAt(Height(height)).keyBytes)
+ var lastBlockTs = TxFilterResetTs
+
+ while (
+ iter.isValid &&
+ iter.key().startsWith(KeyTags.BlockInfoAtHeight.prefixBytes) &&
+ (TxFilterResetTs - lastBlockTs) < settings.functionalitySettings.maxTransactionTimeBackOffset.toMillis * 2
+ ) {
+ lastBlockTs = readBlockMeta(iter.value()).getHeader.timestamp
+ fromHeight = Ints.fromByteArray(iter.key().drop(2))
+ iter.prev()
+ }
}
- filter
- }
- val lastFilterStart = (height / BlockStep) * BlockStep + 1
- val prevFilterStart = lastFilterStart - BlockStep
- val (bf0Heights, bf1Heights) = if ((height / BlockStep) % 2 == 0) {
- (lastFilterStart to height, prevFilterStart until lastFilterStart)
- } else {
- (prevFilterStart until lastFilterStart, lastFilterStart to height)
- }
- (loadFilter(bf0Heights), loadFilter(bf1Heights))
- }
+ Using(writableDB.newIterator(rdb.txHandle.handle)) { iter =>
+ var counter = 0
+ iter.seek(Keys.transactionAt(Height(fromHeight), TxNum(0.toShort), rdb.txHandle).keyBytes)
+ while (
+ iter.isValid &&
+ iter.key().startsWith(KeyTags.NthTransactionInfoAtHeight.prefixBytes) &&
+ Ints.fromByteArray(iter.key().slice(2, 6)) <= height
+ ) {
+ counter += 1
+ prevFilter.put(readTransaction(Height(0))(iter.value())._2.id().arr)
+ iter.next()
+ }
+ log.debug(s"Loaded $counter tx IDs from [$fromHeight, $height]. Filter size is ${memMeter.measureDeep(prevFilter)} bytes")
+ }
- private var (bf0, bf1) = initFilters()
+ prevFilter
+ case None =>
+ mkFilter()
+ }
override def containsTransaction(tx: Transaction): Boolean =
- (bf0.mightContain(tx.id().arr) || bf1.mightContain(tx.id().arr)) && {
+ (prevTxFilter.mightContain(tx.id().arr) || currentTxFilter.mightContain(tx.id().arr)) && {
writableDB.get(Keys.transactionMetaById(TransactionId(tx.id()), rdb.txMetaHandle)).isDefined
}
@@ -586,14 +602,13 @@ class RocksDBWriter(
rw.put(Keys.assetScript(asset)(height), Some(script))
}
- if (height % BlockStep == 1) {
- if ((height / BlockStep) % 2 == 0) {
- bf0 = mkFilter()
- } else {
- bf1 = mkFilter()
- }
+ if (blockMeta.getHeader.timestamp - TxFilterResetTs > settings.functionalitySettings.maxTransactionTimeBackOffset.toMillis * 2) {
+ log.trace(s"Rotating filter at $height, prev ts = $TxFilterResetTs, new ts = ${blockMeta.getHeader.timestamp}, interval = ${Duration
+ .ofMillis(blockMeta.getHeader.timestamp - TxFilterResetTs)}")
+ TxFilterResetTs = blockMeta.getHeader.timestamp
+ prevTxFilter = currentTxFilter
+ currentTxFilter = mkFilter()
}
- val targetBf = if ((height / BlockStep) % 2 == 0) bf0 else bf1
val transactionsWithSize =
snapshot.transactions.zipWithIndex.map { case ((id, txInfo), i) =>
@@ -608,14 +623,14 @@ class RocksDBWriter(
Some(PBSnapshots.toProtobuf(txInfo.snapshot, txInfo.status))
)
rw.put(Keys.transactionMetaById(txId, rdb.txMetaHandle), Some(TransactionMeta(height, num, tx.tpe.id, meta.status.protobuf, 0, size)))
- targetBf.put(id.arr)
+ currentTxFilter.put(id.arr)
txId -> (num, tx, size)
}.toMap
if (dbSettings.storeTransactionsByAddress) {
val addressTxs = addressTransactions.asScala.toSeq.map { case (aid, txIds) =>
- (aid, txIds, Keys.addressTransactionSeqNr(aid))
+ (aid, txIds, Keys.addressTransactionSeqNr(aid, rdb.apiHandle))
}
rw.multiGetInts(addressTxs.view.map(_._3).toVector)
.zip(addressTxs)
@@ -625,7 +640,7 @@ class RocksDBWriter(
val (num, tx, size) = transactionsWithSize(txId)
(tx.tpe.id.toByte, num, size)
}.toSeq
- rw.put(Keys.addressTransactionHN(addressId, nextSeqNr), Some((Height(height), txTypeNumSeq.sortBy(-_._2))))
+ rw.put(Keys.addressTransactionHN(addressId, nextSeqNr, rdb.apiHandle), Some((Height(height), txTypeNumSeq.sortBy(-_._2))))
rw.put(txSeqNrKey, nextSeqNr)
}
}
@@ -637,13 +652,15 @@ class RocksDBWriter(
address <- Seq(details.recipientAddress, details.sender.toAddress)
addressId = this.addressIdWithFallback(address, newAddresses)
} yield (addressId, leaseId)
- val leaseIdsByAddressId = addressIdWithLeaseIds.groupMap { case (addressId, _) => (addressId, Keys.addressLeaseSeqNr(addressId)) }(_._2).toSeq
+ val leaseIdsByAddressId = addressIdWithLeaseIds.groupMap { case (addressId, _) =>
+ (addressId, Keys.addressLeaseSeqNr(addressId, rdb.apiHandle))
+ }(_._2).toSeq
rw.multiGetInts(leaseIdsByAddressId.view.map(_._1._2).toVector)
.zip(leaseIdsByAddressId)
.foreach { case (prevSeqNr, ((addressId, leaseSeqKey), leaseIds)) =>
val nextSeqNr = prevSeqNr.getOrElse(0) + 1
- rw.put(Keys.addressLeaseSeq(addressId, nextSeqNr), Some(leaseIds))
+ rw.put(Keys.addressLeaseSeq(addressId, nextSeqNr, rdb.apiHandle), Some(leaseIds))
rw.put(leaseSeqKey, nextSeqNr)
}
}
@@ -698,7 +715,7 @@ class RocksDBWriter(
})
.getOrElse(throw new IllegalArgumentException(s"Couldn't find transaction height and num: $txId"))
- try rw.put(Keys.invokeScriptResult(txHeight, txNum), Some(result))
+ try rw.put(Keys.invokeScriptResult(txHeight, txNum, rdb.apiHandle), Some(result))
catch {
case NonFatal(e) =>
throw new RuntimeException(s"Error storing invoke script result for $txId: $result", e)
@@ -707,7 +724,7 @@ class RocksDBWriter(
for ((txId, pbMeta) <- snapshot.ethereumTransactionMeta) {
val txNum = transactionsWithSize(TransactionId @@ txId)._1
- val key = Keys.ethereumTransactionMeta(Height(height), txNum)
+ val key = Keys.ethereumTransactionMeta(Height(height), txNum, rdb.apiHandle)
rw.put(key, Some(pbMeta))
}
@@ -975,11 +992,9 @@ class RocksDBWriter(
for ((addressId, address) <- changedAddresses) {
for (k <- rw.get(Keys.changedDataKeys(currentHeight, addressId))) {
- log.trace(s"Discarding $k for $address at $currentHeight")
accountDataToInvalidate += (address -> k)
- rw.delete(Keys.dataAt(addressId, k)(currentHeight))
- rollbackDataHistory(rw, Keys.data(addressId, k), Keys.dataAt(addressId, k)(_), currentHeight)
+ rollbackDataEntry(rw, k, address, addressId, currentHeight)
}
rw.delete(Keys.changedDataKeys(currentHeight, addressId))
@@ -993,9 +1008,9 @@ class RocksDBWriter(
discardLeaseBalance(address)
if (dbSettings.storeTransactionsByAddress) {
- val kTxSeqNr = Keys.addressTransactionSeqNr(addressId)
+ val kTxSeqNr = Keys.addressTransactionSeqNr(addressId, rdb.apiHandle)
val txSeqNr = rw.get(kTxSeqNr)
- val kTxHNSeq = Keys.addressTransactionHN(addressId, txSeqNr)
+ val kTxHNSeq = Keys.addressTransactionHN(addressId, txSeqNr, rdb.apiHandle)
rw.get(kTxHNSeq).collect { case (`currentHeight`, _) =>
rw.delete(kTxHNSeq)
@@ -1004,9 +1019,9 @@ class RocksDBWriter(
}
if (dbSettings.storeLeaseStatesByAddress) {
- val leaseSeqNrKey = Keys.addressLeaseSeqNr(addressId)
+ val leaseSeqNrKey = Keys.addressLeaseSeqNr(addressId, rdb.apiHandle)
val leaseSeqNr = rw.get(leaseSeqNrKey)
- val leaseSeqKey = Keys.addressLeaseSeq(addressId, leaseSeqNr)
+ val leaseSeqKey = Keys.addressLeaseSeq(addressId, leaseSeqNr, rdb.apiHandle)
rw.get(leaseSeqKey)
.flatMap(_.headOption)
.flatMap(leaseDetails)
@@ -1054,7 +1069,7 @@ class RocksDBWriter(
case _: DataTransaction => // see changed data keys removal
case _: InvokeScriptTransaction | _: InvokeExpressionTransaction =>
- rw.delete(Keys.invokeScriptResult(currentHeight, num))
+ rw.delete(Keys.invokeScriptResult(currentHeight, num, rdb.apiHandle))
case tx: CreateAliasTransaction =>
rw.delete(Keys.addressIdOfAlias(tx.alias))
@@ -1063,7 +1078,7 @@ class RocksDBWriter(
ordersToInvalidate += rollbackOrderFill(rw, tx.buyOrder.id(), currentHeight)
ordersToInvalidate += rollbackOrderFill(rw, tx.sellOrder.id(), currentHeight)
case _: EthereumTransaction =>
- rw.delete(Keys.ethereumTransactionMeta(currentHeight, num))
+ rw.delete(Keys.ethereumTransactionMeta(currentHeight, num, rdb.apiHandle))
}
if (tx.tpe != TransactionType.Genesis) {
@@ -1132,14 +1147,20 @@ class RocksDBWriter(
discardedBlocks.reverse
}
- private def rollbackDataHistory(rw: RW, currentDataKey: Key[CurrentData], dataNodeKey: Height => Key[DataNode], currentHeight: Height): Unit = {
- val currentData = rw.get(currentDataKey)
+ private def rollbackDataEntry(rw: RW, key: String, address: Address, addressId: AddressId, currentHeight: Height): Unit = {
+ val currentDataKey = Keys.data(addressId, key)
+ val currentData = rw.get(currentDataKey)
+ rw.delete(Keys.dataAt(addressId, key)(currentHeight))
if (currentData.height == currentHeight) {
- val prevDataNode = rw.get(dataNodeKey(currentData.prevHeight))
- rw.delete(dataNodeKey(currentHeight))
- prevDataNode.entry match {
- case _: EmptyDataEntry => rw.delete(currentDataKey)
- case _ => rw.put(currentDataKey, CurrentData(prevDataNode.entry, currentData.prevHeight, prevDataNode.prevHeight))
+ if (currentData.prevHeight > 0) {
+ val prevDataNode = rw.get(Keys.dataAt(addressId, key)(currentData.prevHeight))
+ log.trace(
+ s"PUT $address($addressId)/$key: ${currentData.entry}@$currentHeight => ${prevDataNode.entry}@${currentData.prevHeight}>${prevDataNode.prevHeight}"
+ )
+ rw.put(currentDataKey, CurrentData(prevDataNode.entry, currentData.prevHeight, prevDataNode.prevHeight))
+ } else {
+ log.trace(s"DEL $address($addressId)/$key: ${currentData.entry}@$currentHeight => EMPTY@${currentData.prevHeight}")
+ rw.delete(currentDataKey)
}
}
}
diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala
index b147da9fa5a..b0fc6bc5400 100644
--- a/node/src/main/scala/com/wavesplatform/database/package.scala
+++ b/node/src/main/scala/com/wavesplatform/database/package.scala
@@ -423,7 +423,7 @@ package object database {
def withReadOptions[A](f: ReadOptions => A): A = {
val snapshot = db.getSnapshot
- val ro = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false)
+ val ro = new ReadOptions().setSnapshot(snapshot)
try f(ro)
finally {
ro.close()
@@ -536,7 +536,11 @@ package object database {
} finally iterator.close()
}
- def resourceObservable: Observable[DBResource] = Observable.resource(Task(DBResource(db)))(r => Task(r.close()))
+ def resourceObservable: Observable[DBResource] =
+ Observable.resource(Task(DBResource(db, None)))(r => Task(r.close()))
+
+ def resourceObservable(iteratorCfHandle: ColumnFamilyHandle): Observable[DBResource] =
+ Observable.resource(Task(DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close()))
def withResource[A](f: DBResource => A): A = {
val resource = DBResource(db)
@@ -544,6 +548,12 @@ package object database {
finally resource.close()
}
+ def withResource[A](iteratorCfHandle: ColumnFamilyHandle)(f: DBResource => A): A = {
+ val resource = DBResource(db, Some(iteratorCfHandle))
+ try f(resource)
+ finally resource.close()
+ }
+
private def multiGetOpt[A](
readOptions: ReadOptions,
keys: collection.IndexedSeq[Key[Option[A]]],
diff --git a/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala b/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala
index 3c3ece9bf70..9fd841ebb93 100644
--- a/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala
+++ b/node/src/main/scala/com/wavesplatform/history/StorageFactory.scala
@@ -9,7 +9,7 @@ import com.wavesplatform.utils.{ScorexLogging, Time, UnsupportedFeature, forceSt
import org.rocksdb.RocksDB
object StorageFactory extends ScorexLogging {
- private val StorageVersion = 1
+ private val StorageVersion = 2
def apply(
settings: WavesSettings,
diff --git a/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala b/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala
index 511bd6861d6..906e085557c 100644
--- a/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala
+++ b/node/src/main/scala/com/wavesplatform/settings/DBSettings.scala
@@ -1,5 +1,4 @@
package com.wavesplatform.settings
-import scala.concurrent.duration.FiniteDuration
case class DBSettings(
directory: String,
@@ -10,7 +9,5 @@ case class DBSettings(
maxCacheSize: Int,
maxRollbackDepth: Int,
cleanupInterval: Option[Int] = None,
- rememberBlocks: FiniteDuration,
- useBloomFilter: Boolean,
- rocksdb: RocksDBSettings
+ rocksdb: RocksDBSettings,
)
diff --git a/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala b/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala
index a31434526b2..30ad8d172a7 100644
--- a/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala
+++ b/node/src/main/scala/com/wavesplatform/settings/RocksDBSettings.scala
@@ -5,6 +5,8 @@ case class RocksDBSettings(
txCacheSize: SizeInBytes,
txMetaCacheSize: SizeInBytes,
txSnapshotCacheSize: SizeInBytes,
+ apiCacheSize: SizeInBytes,
writeBufferSize: SizeInBytes,
- enableStatistics: Boolean
+ enableStatistics: Boolean,
+ paranoidChecks: Boolean
)
diff --git a/node/src/test/resources/application.conf b/node/src/test/resources/application.conf
index 1604353306e..98841a94ec5 100644
--- a/node/src/test/resources/application.conf
+++ b/node/src/test/resources/application.conf
@@ -4,8 +4,7 @@ waves {
wallet.password = "some string as password"
db {
- cleanup-interval = null # Disable in tests by default
-
+ max-cache-size = 1
rocksdb {
main-cache-size = 1K
tx-cache-size = 1K
diff --git a/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala b/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala
index 5bc42e0dfb6..b8aab174712 100644
--- a/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala
+++ b/node/src/test/scala/com/wavesplatform/database/TestStorageFactory.scala
@@ -18,7 +18,6 @@ object TestStorageFactory {
settings.blockchainSettings,
settings.dbSettings,
settings.enableLightMode,
- 100,
Some(MoreExecutors.newDirectExecutorService())
)
(
diff --git a/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala b/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala
index 58bd629cef3..5e07e7de21b 100644
--- a/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala
+++ b/node/src/test/scala/com/wavesplatform/db/InterferableDB.scala
@@ -1,10 +1,17 @@
package com.wavesplatform.db
-import org.rocksdb.{ReadOptions, RocksDB, RocksIterator}
+import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator}
import java.util.concurrent.locks.Lock
case class InterferableDB(db: RocksDB, startRead: Lock) extends RocksDB(db.getNativeHandle) {
+ override def getDefaultColumnFamily: ColumnFamilyHandle = db.getDefaultColumnFamily
+
+ override def newIterator(columnFamilyHandle: ColumnFamilyHandle, readOptions: ReadOptions): RocksIterator = {
+ startRead.lock()
+ db.newIterator(columnFamilyHandle, readOptions)
+ }
+
override def newIterator(options: ReadOptions): RocksIterator = {
startRead.lock()
db.newIterator(options)
diff --git a/node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala b/node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala
new file mode 100644
index 00000000000..fd43e9b3e21
--- /dev/null
+++ b/node/src/test/scala/com/wavesplatform/db/TxBloomFilterSpec.scala
@@ -0,0 +1,34 @@
+package com.wavesplatform.db
+
+import com.wavesplatform.db.WithState.AddrWithBalance
+import com.wavesplatform.settings.WavesSettings
+import com.wavesplatform.test.*
+import com.wavesplatform.transaction.TxHelpers
+
+class TxBloomFilterSpec extends PropSpec with SharedDomain {
+ private val richAccount = TxHelpers.signer(1200)
+
+ override def settings: WavesSettings = DomainPresets.TransactionStateSnapshot
+
+ override def genesisBalances: Seq[AddrWithBalance] = Seq(AddrWithBalance(richAccount.toAddress, 10000.waves))
+
+ property("Filter rotation works") {
+ val transfer = TxHelpers.transfer(richAccount, TxHelpers.address(1201), 10.waves)
+ 1 to 8 foreach { _ => domain.appendBlock() }
+ domain.blockchain.height shouldEqual 9
+ domain.appendBlock(transfer) // transfer at height 10
+ domain.appendBlock() // height = 11
+ domain.appendBlock() // solid state height = 11, filters are rotated
+ domain.appendBlockE(transfer) should produce("AlreadyInTheState")
+
+ domain.appendBlock()
+ val tf2 = TxHelpers.transfer(richAccount, TxHelpers.address(1202), 20.waves)
+ domain.appendBlock(tf2)
+ 1 to 20 foreach { _ =>
+ withClue(s"height = ${domain.blockchain.height}") {
+ domain.appendBlockE(tf2) should produce("AlreadyInTheState")
+ }
+ domain.appendBlock()
+ }
+ }
+}
diff --git a/node/src/test/scala/com/wavesplatform/db/WithState.scala b/node/src/test/scala/com/wavesplatform/db/WithState.scala
index 05446794346..cab873284ee 100644
--- a/node/src/test/scala/com/wavesplatform/db/WithState.scala
+++ b/node/src/test/scala/com/wavesplatform/db/WithState.scala
@@ -72,7 +72,7 @@ trait WithState extends BeforeAndAfterAll with DBCacheSettings with Matchers wit
)
Using.resource(rdw)(test)
} finally {
- Seq(rdb.db.getDefaultColumnFamily, rdb.txHandle.handle, rdb.txMetaHandle.handle).foreach { cfh =>
+ Seq(rdb.db.getDefaultColumnFamily, rdb.txHandle.handle, rdb.txMetaHandle.handle, rdb.apiHandle.handle).foreach { cfh =>
rdb.db.deleteRange(cfh, MinKey, MaxKey)
}
}
@@ -395,7 +395,7 @@ trait WithDomain extends WithState { _: Suite =>
try {
val wrappedDb = wrapDB(rdb.db)
assert(wrappedDb.getNativeHandle == rdb.db.getNativeHandle, "wrap function should not create new database instance")
- domain = Domain(new RDB(wrappedDb, rdb.txMetaHandle, rdb.txHandle, rdb.txSnapshotHandle, Seq.empty), bcu, blockchain, settings)
+ domain = Domain(new RDB(wrappedDb, rdb.txMetaHandle, rdb.txHandle, rdb.txSnapshotHandle, rdb.apiHandle, Seq.empty), bcu, blockchain, settings)
val genesis = balances.map { case AddrWithBalance(address, amount) =>
TxHelpers.genesis(address, amount)
}
diff --git a/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala b/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala
index 0b9ff018b22..bee4b3eb7fa 100644
--- a/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala
+++ b/node/src/test/scala/com/wavesplatform/history/BlockchainUpdaterNFTTest.scala
@@ -89,7 +89,7 @@ class BlockchainUpdaterNFTTest extends PropSpec with DomainScenarioDrivenPropert
val persistedNfts = Seq.newBuilder[IssuedAsset]
d.rdb.db.readOnly { ro =>
val addressId = ro.get(Keys.addressId(firstAccount)).get
- ro.iterateOver(KeyTags.NftPossession.prefixBytes ++ addressId.toByteArray) { e =>
+ ro.iterateOver(KeyTags.NftPossession.prefixBytes ++ addressId.toByteArray, Some(d.rdb.apiHandle.handle)) { e =>
persistedNfts += IssuedAsset(ByteStr(e.getKey.takeRight(32)))
}
}
@@ -99,7 +99,6 @@ class BlockchainUpdaterNFTTest extends PropSpec with DomainScenarioDrivenPropert
val settings = settingsWithFeatures(BlockchainFeatures.NG, BlockchainFeatures.ReduceNFTFee)
withDomain(settings)(assert)
- withDomain(settings.copy(dbSettings = settings.dbSettings.copy(useBloomFilter = true)))(assert)
}
}
diff --git a/node/src/test/scala/com/wavesplatform/history/Domain.scala b/node/src/test/scala/com/wavesplatform/history/Domain.scala
index 58def9c43cc..ff6b3155941 100644
--- a/node/src/test/scala/com/wavesplatform/history/Domain.scala
+++ b/node/src/test/scala/com/wavesplatform/history/Domain.scala
@@ -197,7 +197,7 @@ case class Domain(rdb: RDB, blockchainUpdater: BlockchainUpdaterImpl, rocksDBWri
def balance(address: Address): Long = blockchainUpdater.balance(address)
def balance(address: Address, asset: Asset): Long = blockchainUpdater.balance(address, asset)
- def nftList(address: Address): Seq[(IssuedAsset, AssetDescription)] = rdb.db.withResource { resource =>
+ def nftList(address: Address): Seq[(IssuedAsset, AssetDescription)] = rdb.db.withResource(rdb.apiHandle.handle) { resource =>
AddressPortfolio
.nftIterator(resource, address, blockchainUpdater.bestLiquidSnapshot.orEmpty, None, blockchainUpdater.assetDescription)
.toSeq
diff --git a/node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala b/node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala
new file mode 100644
index 00000000000..074940ba3a9
--- /dev/null
+++ b/node/src/test/scala/com/wavesplatform/state/DataKeyRollback.scala
@@ -0,0 +1,60 @@
+package com.wavesplatform.state
+
+import com.wavesplatform.db.WithState
+import com.wavesplatform.db.WithState.AddrWithBalance
+import com.wavesplatform.lang.directives.values.V7
+import com.wavesplatform.lang.v1.compiler.TestCompiler
+import com.wavesplatform.settings.WavesSettings
+import com.wavesplatform.test.*
+import com.wavesplatform.transaction.TxHelpers
+
+class DataKeyRollback extends PropSpec with SharedDomain {
+ private val richAccount = TxHelpers.signer(1500)
+
+ override def genesisBalances: Seq[WithState.AddrWithBalance] = Seq(AddrWithBalance(richAccount.toAddress, 10_000_000.waves))
+ override def settings: WavesSettings = DomainPresets.TransactionStateSnapshot
+
+ property("check new entries") {
+ val oracleAccount = TxHelpers.signer(1501)
+ val dappAccount = TxHelpers.signer(1502)
+
+ val dataSenderCount = 5
+ val dataEntryCount = 5
+
+ val dataSenders = IndexedSeq.tabulate(dataSenderCount)(i => TxHelpers.signer(1550 + i))
+ domain.appendBlock(
+ TxHelpers
+ .massTransfer(
+ richAccount,
+ dataSenders.map(kp => kp.toAddress -> 100.waves) ++
+ Seq(oracleAccount.toAddress -> 100.waves, dappAccount.toAddress -> 10.waves),
+ fee = 0.05.waves
+ ),
+ TxHelpers.setScript(
+ dappAccount,
+ TestCompiler(V7).compileContract(s"""
+ let oracleAddress = Address(base58'${oracleAccount.toAddress}')
+ @Callable(i)
+ func default() = [
+ IntegerEntry("loadedHeight_" + height.toString() + i.transactionId.toBase58String(), oracleAddress.getIntegerValue("lastUpdatedBlock"))
+ ]
+ """)
+ ),
+ TxHelpers.data(oracleAccount, Seq(IntegerDataEntry("lastUpdatedBlock", 2)))
+ )
+ domain.appendBlock(dataSenders.map(kp => TxHelpers.data(kp, Seq.tabulate(dataEntryCount)(i => IntegerDataEntry("kv_" + i, 501)), 0.01.waves))*)
+ domain.appendBlock(dataSenders.map(kp => TxHelpers.data(kp, Seq.tabulate(dataEntryCount)(i => IntegerDataEntry("kv_" + i, 503)), 0.01.waves))*)
+ domain.appendBlock(
+ (dataSenders.map(kp => TxHelpers.data(kp, Seq.tabulate(dataEntryCount)(i => IntegerDataEntry("kv_" + i, 504)), 0.01.waves)) ++
+ Seq(
+ TxHelpers.invoke(dappAccount.toAddress, invoker = richAccount),
+ TxHelpers.data(oracleAccount, Seq(IntegerDataEntry("lastUpdatedBlock", 5)))
+ ))*
+ )
+ domain.appendBlock()
+ val discardedBlocks = domain.rollbackTo(domain.blockchain.blockId(domain.blockchain.height - 2).get)
+ discardedBlocks.foreach { case (block, _, _) =>
+ domain.appendBlock(block)
+ }
+ }
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 8cce403945c..e169b401214 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -32,19 +32,19 @@ object Dependencies {
val janino = "org.codehaus.janino" % "janino" % "3.1.11"
val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "2.12.3"
val curve25519 = "com.wavesplatform" % "curve25519-java" % "0.6.6"
- val nettyHandler = "io.netty" % "netty-handler" % "4.1.104.Final"
+ val nettyHandler = "io.netty" % "netty-handler" % "4.1.106.Final"
val shapeless = Def.setting("com.chuusai" %%% "shapeless" % "2.3.10")
- val playJson = "com.typesafe.play" %% "play-json" % "2.10.3" // 2.10.x and later is built for Java 11
+ val playJson = "com.typesafe.play" %% "play-json" % "2.10.4"
val scalaTest = "org.scalatest" %% "scalatest" % "3.2.17" % Test
val scalaJsTest = Def.setting("com.lihaoyi" %%% "utest" % "0.8.2" % Test)
- val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.9.1" // 3.6.x and later is built for Java 11
- val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.9.1"
+ val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.9.2"
+ val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.9.2"
- val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk15on" % "1.70"
+ val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk18on" % "1.77"
val console = Seq("com.github.scopt" %% "scopt" % "4.1.0")
@@ -69,7 +69,7 @@ object Dependencies {
curve25519,
bouncyCastleProvider,
"com.wavesplatform" % "zwaves" % "0.2.1",
- web3jModule("crypto")
+ web3jModule("crypto").excludeAll(ExclusionRule("org.bouncycastle", "bcprov-jdk15on")),
) ++ langCompilerPlugins.value ++ scalapbRuntime.value ++ protobuf.value
)
@@ -100,7 +100,7 @@ object Dependencies {
akkaModule("slf4j") % Runtime
)
- private val rocksdb = "org.rocksdb" % "rocksdbjni" % "8.9.1"
+ private val rocksdb = "org.rocksdb" % "rocksdbjni" % "8.10.0"
lazy val node = Def.setting(
Seq(
@@ -127,10 +127,10 @@ object Dependencies {
monixModule("reactive").value,
nettyHandler,
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.5",
- "eu.timepit" %% "refined" % "0.11.0" exclude ("org.scala-lang.modules", "scala-xml_2.13"),
+ "eu.timepit" %% "refined" % "0.11.1" exclude ("org.scala-lang.modules", "scala-xml_2.13"),
"com.esaulpaugh" % "headlong" % "10.0.2",
"com.github.jbellis" % "jamm" % "0.4.0", // Weighing caches
- web3jModule("abi"),
+ web3jModule("abi").excludeAll(ExclusionRule("org.bouncycastle", "bcprov-jdk15on")),
akkaModule("testkit") % Test,
akkaHttpModule("akka-http-testkit") % Test
) ++ test ++ console ++ logDeps ++ protobuf.value ++ langCompilerPlugins.value
@@ -161,8 +161,6 @@ object Dependencies {
lazy val rideRunner = Def.setting(
Seq(
rocksdb,
- // https://github.com/netty/netty/wiki/Native-transports
- // "io.netty" % "netty-transport-native-epoll" % "4.1.79.Final" classifier "linux-x86_64",
"com.github.ben-manes.caffeine" % "caffeine" % "3.1.8",
"net.logstash.logback" % "logstash-logback-encoder" % "7.4" % Runtime,
kamonModule("caffeine"),
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 7cb18557cca..9c052045cf2 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -6,7 +6,7 @@ resolvers ++= Seq(
// Should go before Scala.js
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.6")
-libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.14"
+libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.15"
Seq(
"com.eed3si9n" % "sbt-assembly" % "2.1.5",
diff --git a/repl/jvm/src/test/logback-test.xml b/repl/jvm/src/test/logback-test.xml
deleted file mode 100644
index 8b4e22b2f22..00000000000
--- a/repl/jvm/src/test/logback-test.xml
+++ /dev/null
@@ -1,14 +0,0 @@
-
-
-
-
- %date %-5level [%.15thread] %logger{26} - %msg%n
-
-
-
-
-
-
-
-
-
diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala
index 919ddd1bd63..fdd474f5b53 100644
--- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala
+++ b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyTags.scala
@@ -53,7 +53,6 @@ object KeyTags extends Enumeration {
AssetStaticInfo,
NftCount,
NftPossession,
- BloomFilterChecksum,
IssuedAssets,
UpdatedAssets,
SponsoredAssets,