diff --git a/.github/workflows/check-pr.yaml b/.github/workflows/check-pr.yaml index 9cd4f4abf6b..6fc8704af49 100644 --- a/.github/workflows/check-pr.yaml +++ b/.github/workflows/check-pr.yaml @@ -53,9 +53,9 @@ jobs: SONATYPE_USERNAME: ${{ secrets.OSSRH_USERNAME }} PGP_PASSPHRASE: ${{ secrets.OSSRH_GPG_PASSPHRASE }} run: | - sbt_version=$(cut -d\" -f2 version.sbt) + version=$(echo ${{ github.base_ref }} | awk -F '[-\.]' '{print $2"."$3}') pr_number=${{ github.event.number }} - sbt -Dproject.version=$sbt_version-$pr_number-SNAPSHOT --mem 4096 --batch publishSigned + sbt -Dproject.version=$version-$pr_number-SNAPSHOT --mem 4096 --batch publishSigned - name: Save debug data uses: actions/upload-artifact@v4 if: always() diff --git a/.github/workflows/push-default-branch.yml b/.github/workflows/push-default-branch.yml new file mode 100644 index 00000000000..58e27ce56e7 --- /dev/null +++ b/.github/workflows/push-default-branch.yml @@ -0,0 +1,34 @@ +name: Publish Artifacts + +on: + push: + branches: + - 'version-1.*.*' + +jobs: + update-graph: + name: Update Dependency Graph + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: scalacenter/sbt-dependency-submission@v3 + publish-snapshot: + name: Publish Snapshots to Sonatype + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-java@v4 + with: + distribution: 'temurin' + java-version: '11' + cache: 'sbt' + gpg-private-key: ${{ secrets.OSSRH_GPG_KEY }} + gpg-passphrase: ${{ secrets.OSSRH_GPG_PASSPHRASE }} + - name: Publish snapshots + env: + SONATYPE_PASSWORD: ${{ secrets.OSSRH_PASSWORD }} + SONATYPE_USERNAME: ${{ secrets.OSSRH_USERNAME }} + PGP_PASSPHRASE: ${{ secrets.OSSRH_GPG_PASSPHRASE }} + run: | + version=$(echo ${{ github.base_ref }} | awk -F '[-\.]' '{print $2"."$3}')-SNAPSHOT + sbt -Dproject.version=$version --mem 4096 --batch publishSigned diff --git a/build.sbt b/build.sbt index ec0746e282b..f30f3432938 100644 --- a/build.sbt +++ b/build.sbt @@ -13,7 +13,7 @@ enablePlugins(GitVersioning) git.uncommittedSignifier := Some("DIRTY") git.useGitDescribe := true ThisBuild / git.useGitDescribe := true -ThisBuild / PB.protocVersion := "3.25.1" // https://protobuf.dev/support/version-support/#java +ThisBuild / PB.protocVersion := "3.25.5" // https://protobuf.dev/support/version-support/#java lazy val lang = crossProject(JSPlatform, JVMPlatform) @@ -145,7 +145,7 @@ lazy val `waves-node` = (project in file(".")) inScope(Global)( Seq( - scalaVersion := "2.13.14", + scalaVersion := "2.13.15", organization := "com.wavesplatform", organizationName := "Waves Platform", organizationHomepage := Some(url("https://wavesplatform.com")), diff --git a/docker/Dockerfile b/docker/Dockerfile index c9ee2f66f04..9c1f4a7dbb8 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,27 +1,17 @@ -FROM eclipse-temurin:11-jre-jammy +FROM eclipse-temurin:11-jre-noble ENV WAVES_LOG_LEVEL=INFO ENV WAVES_HEAP_SIZE=2g ENV WAVES_NETWORK=mainnet -ENV YOURKIT_VERSION=2023.5 - SHELL ["/bin/bash", "-c"] -# Additional dependencies -RUN apt-get update && apt-get install -y wget unzip gosu || exit 1; \ - export YOURKIT_ARCHIVE="YourKit-JavaProfiler-$YOURKIT_VERSION-docker.zip"; \ - wget --quiet "https://www.yourkit.com/download/docker/$YOURKIT_ARCHIVE" -P /tmp/ && unzip /tmp/$YOURKIT_ARCHIVE -d /usr/local || exit 1; \ - # Clean - apt-get remove -y wget unzip && apt-get autoremove -y && apt-get autoclean && rm -rf /var/lib/apt/lists/* - # Node files ENV WVDATA=/var/lib/waves ENV WVLOG=/var/log/waves ENV WAVES_INSTALL_PATH=/usr/share/waves ENV WAVES_CONFIG=/etc/waves/waves.conf COPY target /tmp/ -COPY waves.conf.template $WAVES_CONFIG # Setup node COPY entrypoint.sh $WAVES_INSTALL_PATH/bin/entrypoint.sh diff --git a/docker/README.md b/docker/README.md index 2ea800ff610..031b2a6d658 100644 --- a/docker/README.md +++ b/docker/README.md @@ -28,39 +28,39 @@ It is highly recommended to read more about [Waves Node configuration](https://d ## Running Docker image + + ### Configuration options 1. The image supports Waves Node config customization. To change a config field use corresponding JVM options. JVM options can be sent to JVM using `JAVA_OPTS` environment variable. Please refer to ([complete configuration file](https://github.com/wavesplatform/Waves/blob/master/node/src/main/resources/application.conf)) to get the full path of the configuration item you want to change. -``` -docker run -v /docker/waves/waves-data:/var/lib/waves -v /docker/waves/waves-config:/etc/waves -p 6869:6869 -p 6862:6862 -e JAVA_OPTS="-Dwaves.rest-api.enable=yes -Dwaves.wallet.password=myWalletSuperPassword" -ti wavesplatform/wavesnode -``` + ``` + docker run -v /docker/waves/waves-data:/var/lib/waves -v /docker/waves/waves-config:/etc/waves -p 6869:6869 -p 6862:6862 -e JAVA_OPTS="-Dwaves.rest-api.enable=yes -Dwaves.wallet.password=myWalletSuperPassword" -ti wavesplatform/wavesnode + ``` -2. Waves Node is looking for a config in the directory `/etc/waves/waves.conf` which can be mounted using Docker volumes. During image build, a default configuration will be copied to this directory. While running container if the value of `WAVES_NETWORK` is not `mainnet`, `testnet` or `stagenet`, default configuration won't be enough for correct node working. This is a scenario of using `CUSTOM` network - correct configuration must be provided when running container. If you use `CUSTOM` network and `/etc/waves/waves.conf` is NOT found Waves Node container will exit. +2. Waves Node is looking for a config in the directory `/etc/waves/waves.conf` which can be mounted using Docker volumes. For custom networks, correct configuration file must be provided when running container. If you use `CUSTOM` network and `/etc/waves/waves.conf` is NOT found Waves Node container will exit. -3. By default, `/etc/waves/waves.conf` config includes `/etc/waves/local.conf`. Custom `/etc/waves/local.conf` can be used to override default config entries. Custom `/etc/waves/waves.conf` can be used to override or the whole configuration. For additional information about Docker volumes mapping please refer to `Managing data` item. +3. You can use custom config to override or the whole configuration. For additional information about Docker volumes mapping please refer to `Managing data` item. 4. You can override the default executable by using the following syntax: -``` -docker run -it wavesplatform/wavesnode [command] [args] -``` + ``` + docker run -it wavesplatform/wavesnode [command] [args] + ``` ### Environment variables -**You can run container with predefined environment variables:** - -| Env variable | Description | -|-----------------------------------|--------------| -| `WAVES_WALLET_SEED` | Base58 encoded seed. Overrides `-Dwaves.wallet.seed` JVM config option. | -| `WAVES_WALLET_PASSWORD` | Password for the wallet file. Overrides `-Dwaves.wallet.password` JVM config option. | -| `WAVES_LOG_LEVEL` | Node logging level. Available values: `OFF`, `ERROR`, `WARN`, `INFO`, `DEBUG`, `TRACE`. More details about logging are available [here](https://docs.waves.tech/en/waves-node/logging-configuration).| -| `WAVES_HEAP_SIZE` | Default Java Heap Size limit in -X Command-line Options notation (`-Xms=[your value]`). More details [here](https://docs.oracle.com/cd/E13150_01/jrockit_jvm/jrockit/jrdocs/refman/optionX.html). | -|`WAVES_NETWORK` | Waves Blockchain network. Available values are `mainnet`, `testnet`, `stagenet`.| -|`JAVA_OPTS` | Additional Waves Node JVM configuration options. | +The following environment variables can be passed to the container: -**Note: All variables are optional.** +| Env variable | Description | +|-------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `WAVES_WALLET_SEED` | Base58 encoded seed, sets `-Dwaves.wallet.seed` system property. | +| `WAVES_WALLET_PASSWORD` | Password for the wallet file, sets `-Dwaves.wallet.password` system property. | +| `WAVES_LOG_LEVEL` | Node stdout logging level. Available values: `OFF`, `ERROR`, `WARN`, `INFO`, `DEBUG`, `TRACE`. More details about logging are available [here](https://docs.waves.tech/en/waves-node/logging-configuration). | +| `WAVES_HEAP_SIZE` | Default Java Heap Size limit in -X Command-line Options notation (`-Xmx=[your value]`). More details [here](https://docs.oracle.com/cd/E13150_01/jrockit_jvm/jrockit/jrdocs/refman/optionX.html). | +| `WAVES_NETWORK` | Waves Blockchain network. Available values are `mainnet`, `testnet`, `stagenet`. | +| `JAVA_OPTS` | Additional Waves Node JVM configuration options. | -**Note: Environment variables override values in the configuration file.** +All environment variables are optional, however you need to specify at least the desired network and wallet password (via environment variables, additional system properties defined in the `JAVA_OPTS` environment variable, or in the config file). ### Managing data We recommend to store the blockchain state as well as Waves configuration on the host side. As such, consider using Docker volumes mapping to map host directories inside the container: diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 1fd46715e46..9d01196ea01 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -1,29 +1,30 @@ #!/bin/bash -shopt -s nullglob -logEcho() { - echo $1 | gosu waves tee -a /var/log/waves/waves.log -} -[ -n "${YOURKIT_OPTS}" ] && JAVA_OPTS="$JAVA_OPTS -agentpath:/usr/local/YourKit-JavaProfiler-$YOURKIT_VERSION/bin/linux-x86-64/libyjpagent.so=$YOURKIT_OPTS" -JAVA_OPTS="${JAVA_OPTS} -Dwaves.defaults.blockchain.type=$WAVES_NETWORK -Dwaves.defaults.directory=$WVDATA" - -logEcho "Node is starting..." -logEcho "WAVES_HEAP_SIZE='${WAVES_HEAP_SIZE}'" -logEcho "WAVES_LOG_LEVEL='${WAVES_LOG_LEVEL}'" -logEcho "JAVA_OPTS='${JAVA_OPTS}'" - -JAVA_OPTS="-Dlogback.stdout.level=${WAVES_LOG_LEVEL} - -XX:+ExitOnOutOfMemoryError +JAVA_OPTS="-XX:+ExitOnOutOfMemoryError -Xmx${WAVES_HEAP_SIZE} - -Dlogback.file.directory=$WVLOG - -Dconfig.override_with_env_vars=true + --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED + -Dlogback.stdout.level=${WAVES_LOG_LEVEL} + -Dlogback.file.directory=${WVLOG} + -Dwaves.config.directory=/etc/waves + -Dwaves.defaults.blockchain.type=${WAVES_NETWORK} + -Dwaves.directory=${WVDATA} + -Dwaves.rest-api.bind-address=0.0.0.0 ${JAVA_OPTS}" -if [ $# -eq 0 ] - then - ARGS="$WAVES_CONFIG" - else - ARGS=$@ +echo "JAVA_OPTS=${JAVA_OPTS}" | tee -a ${WVLOG}/waves.log + +if [ -n "$WAVES_WALLET_SEED" ] ; then + JAVA_OPTS="-Dwaves.wallet.seed=${WAVES_WALLET_SEED} ${JAVA_OPTS}" +fi + +if [ -n "$WAVES_WALLET_PASSWORD" ] ; then + JAVA_OPTS="-Dwaves.wallet.password=${WAVES_WALLET_PASSWORD} ${JAVA_OPTS}" +fi + +if [ $# -eq 0 ] && [ -f /etc/waves/waves.conf ] ; then + ARGS="/etc/waves/waves.conf" +else + ARGS=$@ fi -exec java $JAVA_OPTS -cp "${WAVES_INSTALL_PATH}/lib/plugins/*:$WAVES_INSTALL_PATH/lib/*" com.wavesplatform.Application $ARGS +exec java $JAVA_OPTS -cp "$WAVES_INSTALL_PATH/lib/plugins/*:$WAVES_INSTALL_PATH/lib/*" com.wavesplatform.Application $ARGS diff --git a/docker/waves.conf.template b/docker/waves.conf.template deleted file mode 100644 index 14efef70fba..00000000000 --- a/docker/waves.conf.template +++ /dev/null @@ -1,10 +0,0 @@ -waves { - directory = ${WVDATA} - wallet { - seed = ${?WAVES_WALLET_SEED} - password = ${?WAVES_WALLET_PASSWORD} - } - rest-api.bind-address = "0.0.0.0" -} - -include "local.conf" diff --git a/lang/shared/src/main/scala/com/wavesplatform/lang/v1/parser/Parser.scala b/lang/shared/src/main/scala/com/wavesplatform/lang/v1/parser/Parser.scala index 3d5b28789a0..c1a75b9c31a 100644 --- a/lang/shared/src/main/scala/com/wavesplatform/lang/v1/parser/Parser.scala +++ b/lang/shared/src/main/scala/com/wavesplatform/lang/v1/parser/Parser.scala @@ -48,8 +48,9 @@ class Parser(stdLibVersion: StdLibVersion)(implicit offset: LibrariesOffset) { def unusedText[A: P] = comment ~ directive ~ comment def escapedUnicodeSymbolP[A: P]: P[(Int, String, Int)] = P(Index ~~ (NoCut(unicodeSymbolP) | specialSymbols).! ~~ Index) + def escapedUnicodeOrEndOfString[A: P]: P[Any] = escapedUnicodeSymbolP[A] | notEndOfString def stringP[A: P]: P[EXPR] = - P(Index ~~ "\"" ~/ Pass ~~ (escapedUnicodeSymbolP | notEndOfString).!.repX ~~ "\"" ~~ Index) + P(Index ~~ "\"" ~/ Pass ~~ (escapedUnicodeOrEndOfString).!.repX ~~ "\"" ~~ Index) .map { case (start, xs, end) => var errors = Vector.empty[String] val consumedString = new StringBuilder @@ -255,9 +256,9 @@ class Parser(stdLibVersion: StdLibVersion)(implicit offset: LibrariesOffset) { def funcBody = singleBaseExpr def correctFunc = Index ~~ funcKWAndName ~ comment ~/ args(min = 0) ~ ("=" ~ funcBody | "=" ~/ Fail.opaque("function body")) ~~ Index def noKeyword = { - def noArgs = "(" ~ comment ~ ")" ~ comment - def validName = NoCut(funcName).filter(_.isInstanceOf[VALID[?]]) - def argsOrEqual = (NoCut(args(min = 1)) ~ "=".?) | (noArgs ~ "=" ~~ !"=") + def noArgs = "(" ~ comment ~ ")" ~ comment + def validName = NoCut(funcName).filter(_.isInstanceOf[VALID[?]]) + def argsOrEqual: P[Any] = (NoCut(args(min = 1)) ~ "=".?) | (noArgs ~ "=" ~~ !"=") (validName ~ comment ~ argsOrEqual ~/ funcBody.? ~~ Fail) .asInstanceOf[P[Nothing]] .opaque(""""func" keyword""") diff --git a/node-it/src/test/scala/com/wavesplatform/it/Docker.scala b/node-it/src/test/scala/com/wavesplatform/it/Docker.scala index 82114f86b93..04f496b7c07 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/Docker.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/Docker.scala @@ -27,7 +27,7 @@ import org.asynchttpclient.Dsl.* import java.io.{FileOutputStream, IOException} import java.net.{InetAddress, InetSocketAddress, URL} import java.nio.file.{Files, Path, Paths} -import java.time.LocalDateTime +import java.time.{LocalDateTime, Duration as JDuration} import java.time.format.DateTimeFormatter import java.util.Collections.* import java.util.concurrent.ConcurrentHashMap @@ -58,9 +58,9 @@ class Docker( .setMaxConnections(18) .setMaxConnectionsPerHost(3) .setMaxRequestRetry(1) - .setReadTimeout(10000) + .setReadTimeout(JDuration.ofSeconds(10)) .setKeepAlive(false) - .setRequestTimeout(10000) + .setRequestTimeout(JDuration.ofSeconds(10)) ) private val client = DefaultDockerClient.fromEnv().build() diff --git a/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala b/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala index 0cbee5b60ce..4e2f4bf832a 100644 --- a/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala +++ b/node-it/src/test/scala/com/wavesplatform/it/api/AsyncHttpApi.scala @@ -4,6 +4,7 @@ import java.io.IOException import java.net.{InetSocketAddress, URLEncoder} import java.util.concurrent.TimeoutException import java.util.{NoSuchElementException, UUID} +import java.time.{Duration as JDuration} import com.google.protobuf.ByteString import com.wavesplatform.account.{AddressOrAlias, AddressScheme, KeyPair, SeedKeyPair} import com.wavesplatform.api.http.DebugMessage.* @@ -183,8 +184,8 @@ object AsyncHttpApi extends Assertions { def request = _get(s"${n.nodeApiEndpoint}/blocks/height?${System.currentTimeMillis()}") - .setReadTimeout(timeout) - .setRequestTimeout(timeout) + .setReadTimeout(JDuration.ofMillis(timeout)) + .setRequestTimeout(JDuration.ofMillis(timeout)) .build() def send(): Future[Option[Response]] = diff --git a/node/src/main/scala/com/wavesplatform/Application.scala b/node/src/main/scala/com/wavesplatform/Application.scala index 886ac6b1a17..a954f7385fc 100644 --- a/node/src/main/scala/com/wavesplatform/Application.scala +++ b/node/src/main/scala/com/wavesplatform/Application.scala @@ -567,7 +567,7 @@ object Application extends ScorexLogging { .map(_.toUpperCase) .getOrElse("TESTNET") - log.warn(s"Config file not defined, default $currentBlockchainType config will be used") + log.info(s"Config file not specified, default $currentBlockchainType config will be used") case Failure(exception) => log.error(s"Couldn't read ${external.get.toPath.toAbsolutePath}", exception) forceStopApplication(Misconfiguration) diff --git a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala index bfbc56bd260..4fb8afb8d88 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/AddressTransactions.scala @@ -135,7 +135,7 @@ object AddressTransactions { types: Set[Transaction.Type] ) extends AbstractIterator[Seq[(TxMeta, Transaction, Option[TxNum])]] { private val seqNr = db.get(Keys.addressTransactionSeqNr(addressId, apiHandle)) - db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))() + db.withSafePrefixIterator(_.seekForPrev(Keys.addressTransactionHN(addressId, seqNr, apiHandle).keyBytes))(()) final override def computeNext(): Seq[(TxMeta, Transaction, Option[TxNum])] = db.withSafePrefixIterator { dbIterator => val keysBuffer = new ArrayBuffer[Key[Option[(TxMeta, Transaction)]]]() diff --git a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala index 4faaf1b6c13..299d738c27b 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/CommonAccountsApi.scala @@ -141,7 +141,7 @@ object CommonAccountsApi { private val length: Int = entriesFromDiff.length - db.withSafePrefixIterator(_.seek(prefix))() + db.withSafePrefixIterator(_.seek(prefix))(()) private var nextIndex = 0 private var nextDbEntry: Option[DataEntry[?]] = None diff --git a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala index f9821a7d214..97211b5b575 100644 --- a/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala +++ b/node/src/main/scala/com/wavesplatform/api/common/lease/LeaseByAddressIterator.scala @@ -9,7 +9,7 @@ import com.wavesplatform.state.LeaseDetails private class LeaseByAddressIterator(resource: DBResource, apiHandle: RDB.ApiHandle, addressId: AddressId) extends AbstractIterator[Seq[(ByteStr, LeaseDetails)]] { private val seqNr = resource.get(Keys.addressLeaseSeqNr(addressId, apiHandle)) - resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))() + resource.withSafePrefixIterator(_.seekForPrev(Keys.addressLeaseSeq(addressId, seqNr, apiHandle).keyBytes))(()) final override def computeNext(): Seq[(ByteStr, LeaseDetails)] = resource.withSafePrefixIterator { iterator => diff --git a/node/src/main/scala/com/wavesplatform/database/DBResource.scala b/node/src/main/scala/com/wavesplatform/database/DBResource.scala index 7577b8047cf..0d5afe026ac 100644 --- a/node/src/main/scala/com/wavesplatform/database/DBResource.scala +++ b/node/src/main/scala/com/wavesplatform/database/DBResource.scala @@ -5,56 +5,53 @@ import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator} import scala.collection.View import scala.collection.mutable.ArrayBuffer -trait DBResource extends AutoCloseable { - def get[V](key: Key[V]): V - def get(key: Array[Byte]): Array[Byte] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] - def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] - def prefixIterator: RocksIterator // Should have a single instance - def fullIterator: RocksIterator // Should have a single instance - def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A - def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A -} - -object DBResource { - def apply(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None): DBResource = new DBResource { - private[this] val snapshot = db.getSnapshot - // checksum may be verification is **very** expensive, so it's explicitly disabled - private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) +class DBResource(db: RocksDB, iteratorCfHandle: Option[ColumnFamilyHandle] = None) extends AutoCloseable { + private[this] val snapshot = db.getSnapshot + // checksum verification is **very** expensive, so it's explicitly disabled + private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) - override def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes)) + def get[V](key: Key[V]): V = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes)) - override def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key) + def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key) - override def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] = - db.multiGetFlat(readOptions, keys, valBufferSizes) + def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] = + db.multiGetFlat(readOptions, keys, valBufferSizes) - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] = - db.multiGet(readOptions, keys, valBufferSizes) + def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] = + db.multiGet(readOptions, keys, valBufferSizes) - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = - db.multiGet(readOptions, keys, valBufferSize) + def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = + db.multiGet(readOptions, keys, valBufferSize) - override lazy val prefixIterator: RocksIterator = - db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) + @volatile private var prefixIteratorWasOpened = false + /** + * Finds the exact key for iter.seek(key) if key.length < 10 and becomes invalid on iter.next(). + * Works as intended if prefix(key).length >= 10. + * @see RDB.newColumnFamilyOptions + */ + lazy val prefixIterator: RocksIterator = { + prefixIteratorWasOpened = true + db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) + } - override lazy val fullIterator: RocksIterator = - db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true)) + @volatile private var fullIteratorWasOpened = false + lazy val fullIterator: RocksIterator = { + fullIteratorWasOpened = true + db.newIterator(iteratorCfHandle.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true)) + } - override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { - if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed - } + def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { + if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed + } - override def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized { - if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed - } + def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized { + if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed + } - override def close(): Unit = { - prefixIterator.synchronized(prefixIterator.close()) - fullIterator.synchronized(fullIterator.close()) - db.releaseSnapshot(snapshot) - readOptions.close() - } + override def close(): Unit = { + if (prefixIteratorWasOpened) prefixIterator.synchronized(prefixIterator.close()) + if (fullIteratorWasOpened) fullIterator.synchronized(fullIterator.close()) + db.releaseSnapshot(snapshot) + readOptions.close() } } diff --git a/node/src/main/scala/com/wavesplatform/database/package.scala b/node/src/main/scala/com/wavesplatform/database/package.scala index 36d6ce86319..59a768b73fa 100644 --- a/node/src/main/scala/com/wavesplatform/database/package.scala +++ b/node/src/main/scala/com/wavesplatform/database/package.scala @@ -540,19 +540,19 @@ package object database { } def resourceObservable: Observable[DBResource] = - Observable.resource(Task(DBResource(db, None)))(r => Task(r.close())) + Observable.resource(Task(new DBResource(db, None)))(r => Task(r.close())) def resourceObservable(iteratorCfHandle: ColumnFamilyHandle): Observable[DBResource] = - Observable.resource(Task(DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close())) + Observable.resource(Task(new DBResource(db, Some(iteratorCfHandle))))(r => Task(r.close())) def withResource[A](f: DBResource => A): A = { - val resource = DBResource(db) + val resource = new DBResource(db) try f(resource) finally resource.close() } def withResource[A](iteratorCfHandle: ColumnFamilyHandle)(f: DBResource => A): A = { - val resource = DBResource(db, Some(iteratorCfHandle)) + val resource = new DBResource(db, Some(iteratorCfHandle)) try f(resource) finally resource.close() } diff --git a/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppComplexityCountTest.scala b/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppComplexityCountTest.scala index a5e0a535087..669e7bd2d8d 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppComplexityCountTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/diffs/ci/sync/SyncDAppComplexityCountTest.scala @@ -212,9 +212,9 @@ class SyncDAppComplexityCountTest extends PropSpec with WithDomain { val expectedPortfolios = if (exceeding || raiseError) basePortfolios else basePortfolios |+| additionalPortfolios expectedPortfolios .foreach { case (address, expectedPortfolio) => - expectedPortfolio.balance shouldBe snapshot.balances.get((address, Waves)).map(_ - db.balance(address)).getOrElse(0) + expectedPortfolio.balance shouldBe snapshot.balances.get((address, Waves)).map(_ - db.balance(address)).getOrElse(0L) expectedPortfolio.assets.foreach { case (asset, balance) => - balance shouldBe snapshot.balances.get((address, asset)).map(_ - db.balance(address, asset)).getOrElse(0) + balance shouldBe snapshot.balances.get((address, asset)).map(_ - db.balance(address, asset)).getOrElse(0L) } } } diff --git a/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/CommonFunctionsTest.scala b/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/CommonFunctionsTest.scala index 7c52bf51ab5..de636115009 100644 --- a/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/CommonFunctionsTest.scala +++ b/node/tests/src/test/scala/com/wavesplatform/state/diffs/smart/predef/CommonFunctionsTest.scala @@ -140,7 +140,7 @@ class CommonFunctionsTest extends PropSpec { TxHelpers.issue(version = TxVersion.V1), createMassTransfer() ).foreach { tx => - Try[Either[String, ?]] { + Try { runScript( s""" |let t = 100 @@ -153,7 +153,7 @@ class CommonFunctionsTest extends PropSpec { |""".stripMargin, Coproduct(tx) ) - }.recover { + }.recover[Any] { case ex: MatchError => Assertions.assert(ex.getMessage().contains("Compilation failed: Value 't' already defined in the scope")) case _: Throwable => Assertions.fail("Some unexpected error") @@ -171,7 +171,7 @@ class CommonFunctionsTest extends PropSpec { | } |""".stripMargin ) - }.recover { + }.recover[Any] { case ex: MatchError => Assertions.assert(ex.getMessage().contains("Compilation failed: A definition of 'p' is not found")) case _: Throwable => Assertions.fail("Some unexpected error") } @@ -274,7 +274,7 @@ class CommonFunctionsTest extends PropSpec { (s"Addr(base58'$realAddr')", "Can't find a function 'Addr'") ) for ((clause, err) <- cases) { - Try[Either[String, ?]] { + Try { runScript( s""" |match tx { @@ -285,7 +285,7 @@ class CommonFunctionsTest extends PropSpec { |} |""".stripMargin ) - }.recover { + }.recover[Any] { case ex: MatchError => Assertions.assert(ex.getMessage().contains(err)) case e: Throwable => Assertions.fail("Unexpected error", e) } diff --git a/node/tests/src/test/scala/com/wavesplatform/transaction/smart/EthereumTransactionSpec.scala b/node/tests/src/test/scala/com/wavesplatform/transaction/smart/EthereumTransactionSpec.scala index c3d1e65655e..44dcb945148 100644 --- a/node/tests/src/test/scala/com/wavesplatform/transaction/smart/EthereumTransactionSpec.scala +++ b/node/tests/src/test/scala/com/wavesplatform/transaction/smart/EthereumTransactionSpec.scala @@ -106,7 +106,7 @@ class EthereumTransactionSpec val assetTransfer = EthTxGenerator.generateEthTransfer(senderAccount, recipientAddress, Long.MaxValue, TestAsset) differ(assetTransfer).balances shouldBe Map( - (senderAddress, TestAsset) -> 0, + (senderAddress, TestAsset) -> 0L, (senderAddress, Waves) -> (LongMaxMinusFee + transfer.fee.longValue()), (recipientAddress, TestAsset) -> Long.MaxValue ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 687b2780744..95bbf073f9e 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -20,29 +20,29 @@ object Dependencies { def monixModule(module: String): Def.Initialize[ModuleID] = Def.setting("io.monix" %%% s"monix-$module" % "3.4.1") - private def grpcModule(module: String) = "io.grpc" % module % "1.62.2" + private def grpcModule(module: String) = "io.grpc" % module % "1.68.0" val kindProjector = compilerPlugin("org.typelevel" % "kind-projector" % "0.13.3" cross CrossVersion.full) val akkaHttp = akkaHttpModule("akka-http") - val googleGuava = "com.google.guava" % "guava" % "33.2.1-jre" + val googleGuava = "com.google.guava" % "guava" % "33.3.1-jre" val kamonCore = kamonModule("core") val machinist = "org.typelevel" %% "machinist" % "0.6.8" - val logback = "ch.qos.logback" % "logback-classic" % "1.5.6" + val logback = "ch.qos.logback" % "logback-classic" % "1.5.8" val janino = "org.codehaus.janino" % "janino" % "3.1.12" - val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "2.12.3" + val asyncHttpClient = "org.asynchttpclient" % "async-http-client" % "3.0.0" val curve25519 = "com.wavesplatform" % "curve25519-java" % "0.6.6" - val nettyHandler = "io.netty" % "netty-handler" % "4.1.100.Final" + val nettyHandler = "io.netty" % "netty-handler" % "4.1.110.Final" val shapeless = Def.setting("com.chuusai" %%% "shapeless" % "2.3.12") val playJson = "com.typesafe.play" %% "play-json" % "2.10.6" val scalaTest = "org.scalatest" %% "scalatest" % "3.2.19" % Test - val scalaJsTest = Def.setting("com.lihaoyi" %%% "utest" % "0.8.3" % Test) + val scalaJsTest = Def.setting("com.lihaoyi" %%% "utest" % "0.8.4" % Test) - val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.9.7" - val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.9.7" + val sttp3 = "com.softwaremill.sttp.client3" % "core_2.13" % "3.9.8" + val sttp3Monix = "com.softwaremill.sttp.client3" %% "monix" % "3.9.8" val bouncyCastleProvider = "org.bouncycastle" % s"bcprov-jdk18on" % "1.78.1" @@ -60,10 +60,10 @@ object Dependencies { // defined here because %%% can only be used within a task or setting macro // explicit dependency can likely be removed when monix 3 is released monixModule("eval").value, - "org.typelevel" %%% s"cats-core" % "2.10.0", - "com.lihaoyi" %%% "fastparse" % "3.0.2", + "org.typelevel" %%% s"cats-core" % "2.12.0", + "com.lihaoyi" %%% "fastparse" % "3.1.1", shapeless.value, - "org.typelevel" %%% "cats-mtl" % "1.4.0", + "org.typelevel" %%% "cats-mtl" % "1.5.0", "ch.obermuhlner" % "big-math" % "2.3.2", googleGuava, // BaseEncoding.base16() curve25519, @@ -77,14 +77,14 @@ object Dependencies { logback, "com.github.jnr" % "jnr-unixsocket" % "0.38.22", // To support Apple ARM "com.spotify" % "docker-client" % "8.16.0", - "com.fasterxml.jackson.dataformat" % "jackson-dataformat-properties" % "2.17.1", + "com.fasterxml.jackson.dataformat" % "jackson-dataformat-properties" % "2.18.0", asyncHttpClient ).map(_ % Test) lazy val test = scalaTest +: Seq( logback, "org.scalatestplus" %% "scalacheck-1-16" % "3.2.14.0", - "org.scalacheck" %% "scalacheck" % "1.18.0", + "org.scalacheck" %% "scalacheck" % "1.18.1", "org.mockito" % "mockito-all" % "1.10.19", "org.scalamock" %% "scalamock" % "6.0.0" ).map(_ % Test) @@ -107,9 +107,9 @@ object Dependencies { rocksdb, ("org.rudogma" %%% "supertagged" % "2.0-RC2").exclude("org.scala-js", "scalajs-library_2.13"), "commons-net" % "commons-net" % "3.11.1", - "commons-io" % "commons-io" % "2.16.1", + "commons-io" % "commons-io" % "2.17.0", "com.iheart" %% "ficus" % "1.5.2", - "net.logstash.logback" % "logstash-logback-encoder" % "7.4" % Runtime, + "net.logstash.logback" % "logstash-logback-encoder" % "8.0" % Runtime, kamonCore, kamonModule("system-metrics"), kamonModule("influxdb"), @@ -128,7 +128,7 @@ object Dependencies { nettyHandler, "com.typesafe.scala-logging" %% "scala-logging" % "3.9.5", "eu.timepit" %% "refined" % "0.11.2" exclude ("org.scala-lang.modules", "scala-xml_2.13"), - "com.esaulpaugh" % "headlong" % "11.1.1", + "com.esaulpaugh" % "headlong" % "12.3.0", "com.github.jbellis" % "jamm" % "0.4.0", // Weighing caches web3jModule("abi").excludeAll(ExclusionRule("org.bouncycastle", "bcprov-jdk15on")) ) ++ console ++ logDeps ++ protobuf.value ++ langCompilerPlugins.value @@ -139,7 +139,7 @@ object Dependencies { akkaHttpModule("akka-http-testkit") % Test ) ++ test - val gProto = "com.google.protobuf" % "protobuf-java" % "3.25.2" // grpc 1.64.0 still requires 3.25 + val gProto = "com.google.protobuf" % "protobuf-java" % "3.25.5" // grpc 1.64.0 still requires 3.25 lazy val scalapbRuntime = Def.setting( Seq( @@ -165,7 +165,7 @@ object Dependencies { Seq( rocksdb, "com.github.ben-manes.caffeine" % "caffeine" % "3.1.8", - "net.logstash.logback" % "logstash-logback-encoder" % "7.4" % Runtime, + "net.logstash.logback" % "logstash-logback-encoder" % "8.0" % Runtime, kamonModule("caffeine"), kamonModule("prometheus"), sttp3, @@ -179,7 +179,7 @@ object Dependencies { ) lazy val circe = Def.setting { - val circeVersion = "0.14.8" + val circeVersion = "0.14.10" Seq( "io.circe" %%% "circe-core", "io.circe" %%% "circe-generic", diff --git a/project/plugins.sbt b/project/plugins.sbt index 0553d10bd56..58fb9dc7a6a 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -10,20 +10,20 @@ libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.11.17" Seq( "com.eed3si9n" % "sbt-assembly" % "2.2.0", - "com.github.sbt" % "sbt-native-packager" % "1.10.0", + "com.github.sbt" % "sbt-native-packager" % "1.10.4", "se.marcuslonnberg" % "sbt-docker" % "1.11.0", "org.scala-js" % "sbt-scalajs" % "1.16.0", "org.portable-scala" % "sbt-scalajs-crossproject" % "1.3.2", "pl.project13.scala" % "sbt-jmh" % "0.4.7", - "com.github.sbt" % "sbt-ci-release" % "1.5.12", + "com.github.sbt" % "sbt-ci-release" % "1.6.1", "com.lightbend.sbt" % "sbt-javaagent" % "0.1.6" ).map(addSbtPlugin) libraryDependencies ++= Seq( - "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.1", + "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.17.2", "org.hjson" % "hjson" % "3.1.0", - "org.vafer" % "jdeb" % "1.10" artifacts Artifact("jdeb", "jar", "jar"), - "org.slf4j" % "jcl-over-slf4j" % "2.0.13", + "org.vafer" % "jdeb" % "1.11" artifacts Artifact("jdeb", "jar", "jar"), + "org.slf4j" % "jcl-over-slf4j" % "2.0.16", ("com.spotify" % "docker-client" % "8.16.0") .exclude("commons-logging", "commons-logging") ) diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/DBResource.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/DBResource.scala deleted file mode 100644 index 6c3eca3c064..00000000000 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/DBResource.scala +++ /dev/null @@ -1,62 +0,0 @@ -package com.wavesplatform.database.rocksdb - -import org.rocksdb.{ReadOptions, RocksDB, RocksIterator} - -import scala.collection.View -import scala.collection.mutable.ArrayBuffer - -trait DBResource extends AutoCloseable { - def get[V](key: Key[V]): V - def get(key: Array[Byte]): Array[Byte] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] - def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] - def prefixIterator: RocksIterator // Should have a single instance - def fullIterator: RocksIterator - def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A - def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A = ()): A -} - -object DBResource { - def apply(db: RocksDB): DBResource = new DBResource { - private[this] val snapshot = db.getSnapshot - private[this] val readOptions = new ReadOptions().setSnapshot(snapshot).setVerifyChecksums(false) - - override def get[V](key: Key[V]): V = key.parse(db.get(readOptions, key.keyBytes)) - - override def get(key: Array[Byte]): Array[Byte] = db.get(readOptions, key) - - override def multiGetFlat[A](keys: ArrayBuffer[Key[Option[A]]], valBufferSizes: ArrayBuffer[Int]): Seq[A] = - db.multiGetFlat(readOptions, keys, valBufferSizes) - - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSizes: ArrayBuffer[Int]): View[A] = - db.multiGet(readOptions, keys, valBufferSizes) - - def multiGet[A](keys: ArrayBuffer[Key[A]], valBufferSize: Int): View[A] = - db.multiGet(readOptions, keys, valBufferSize) - - /** - * Finds the exact key for iter.seek(key) if key.length < 10 and becomes invalid on iter.next(). - * Works as intended if prefix(key).length >= 10. - * @see RDB.newColumnFamilyOptions - */ - override lazy val prefixIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) - - override lazy val fullIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(true)) - - override def withSafePrefixIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = prefixIterator.synchronized { - if (prefixIterator.isOwningHandle) ifNotClosed(prefixIterator) else ifClosed - } - - override def withSafeFullIterator[A](ifNotClosed: RocksIterator => A)(ifClosed: => A): A = fullIterator.synchronized { - if (fullIterator.isOwningHandle) ifNotClosed(fullIterator) else ifClosed - } - - override def close(): Unit = { - prefixIterator.synchronized(prefixIterator.close()) - fullIterator.synchronized(fullIterator.close()) - snapshot.close() - readOptions.close() - } - } -} diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/Key.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/Key.scala deleted file mode 100644 index 6b113983f25..00000000000 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/Key.scala +++ /dev/null @@ -1,52 +0,0 @@ -package com.wavesplatform.database.rocksdb - -import com.google.common.base.CaseFormat -import com.google.common.io.BaseEncoding -import com.google.common.primitives.{Bytes, Shorts} -import org.rocksdb.ColumnFamilyHandle - -abstract class Key[V](prefix: Short, val name: String, val suffix: Array[Byte], val columnFamilyHandle: Option[ColumnFamilyHandle] = None) { - val keyBytes: Array[Byte] = Bytes.concat(Shorts.toByteArray(prefix), suffix) - def parse(bytes: Array[Byte]): V - def encode(v: V): Array[Byte] - - override lazy val toString: String = s"$name($prefix,${BaseEncoding.base16().encode(suffix)})" - - override def equals(obj: Any): Boolean = obj match { - case that: Key[?] => java.util.Arrays.equals(this.keyBytes, that.keyBytes) - case _ => false - } - - override def hashCode(): Int = java.util.Arrays.hashCode(keyBytes) -} -object Key { - private[this] val converter = CaseFormat.UPPER_CAMEL.converterTo(CaseFormat.LOWER_HYPHEN) - private[this] val keyTagToStr = KeyTags.values.toArray.sortBy(_.id).map(v => converter.convert(v.toString)) - - def apply[V]( - keyTag: KeyTags.KeyTag, - keySuffix: Array[Byte], - parser: Array[Byte] => V, - encoder: V => Array[Byte], - cfh: Option[ColumnFamilyHandle] = None - ): Key[V] = - new Key[V](keyTag.id.toShort, keyTagToStr(keyTag.id), keySuffix, cfh) { - override def parse(bytes: Array[Byte]): V = parser(bytes) - override def encode(v: V): Array[Byte] = encoder(v) - } - - def opt[V]( - keyTag: KeyTags.KeyTag, - keySuffix: Array[Byte], - parser: Array[Byte] => V, - encoder: V => Array[Byte], - cfh: Option[ColumnFamilyHandle] = None - ): Key[Option[V]] = - apply[Option[V]]( - keyTag, - keySuffix, - Option(_).map(parser), - _.fold[Array[Byte]](Array.emptyByteArray)(encoder), - cfh - ) -} diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyHelpers.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyHelpers.scala index 607f612ae41..89bf7088b25 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyHelpers.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/KeyHelpers.scala @@ -3,6 +3,7 @@ package com.wavesplatform.database.rocksdb import com.google.common.primitives.{Bytes, Ints, Longs, Shorts} import com.wavesplatform.database.AddressId import com.wavesplatform.state.TxNum +import com.wavesplatform.database.* import java.nio.ByteBuffer diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/RW.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/RW.scala deleted file mode 100644 index d8b975561cb..00000000000 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/RW.scala +++ /dev/null @@ -1,29 +0,0 @@ -package com.wavesplatform.database.rocksdb - -import com.wavesplatform.database.rocksdb.stats.RocksDBStats -import com.wavesplatform.database.rocksdb.stats.RocksDBStats.DbHistogramExt -import org.rocksdb.{ReadOptions, RocksDB, WriteBatch} - -class RW(db: RocksDB, readOptions: ReadOptions, batch: WriteBatch) extends ReadOnlyDB(db, readOptions) { - def put[V](key: Key[V], value: V): Int = { - val bytes = key.encode(value) - RocksDBStats.write.recordTagged(key, bytes) - batch.put(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), key.keyBytes, bytes) - bytes.length - } - - def put(key: Array[Byte], value: Array[Byte]): Unit = batch.put(key, value) - - def update[V](key: Key[V])(f: V => V): Unit = put(key, f(get(key))) - - def delete(key: Array[Byte]): Unit = batch.delete(key) - - def delete[V](key: Key[V]): Unit = - batch.delete(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), key.keyBytes) - - def filterHistory(key: Key[Seq[Int]], heightToRemove: Int): Unit = { - val newValue = get(key).filterNot(_ == heightToRemove) - if (newValue.nonEmpty) put(key, newValue) - else delete(key) - } -} diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/ReadOnlyDB.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/ReadOnlyDB.scala deleted file mode 100644 index 9a065c73d3c..00000000000 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/ReadOnlyDB.scala +++ /dev/null @@ -1,90 +0,0 @@ -package com.wavesplatform.database.rocksdb - -import com.google.common.collect.Maps -import com.wavesplatform.database.rocksdb.stats.RocksDBStats -import com.wavesplatform.database.{DBEntry, KeyTags} -import RocksDBStats.DbHistogramExt -import org.rocksdb.{ColumnFamilyHandle, ReadOptions, RocksDB, RocksIterator} - -import scala.annotation.tailrec -import scala.util.Using - -class ReadOnlyDB(db: RocksDB, readOptions: ReadOptions) { - def get[V](key: Key[V]): V = { - val bytes = db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes) - RocksDBStats.read.recordTagged(key, bytes) - key.parse(bytes) - } - - def getOpt[V](key: Key[V]): Option[V] = { - val bytes = db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), readOptions, key.keyBytes) - RocksDBStats.read.recordTagged(key, bytes) - if (bytes == null) None else Some(key.parse(bytes)) - } - - def multiGetOpt[V](keys: Seq[Key[Option[V]]], valBufferSize: Int): Seq[Option[V]] = - db.multiGetOpt(readOptions, keys, valBufferSize) - - def multiGet[V](keys: Seq[Key[V]], valBufferSize: Int): Seq[Option[V]] = - db.multiGet(readOptions, keys, valBufferSize) - - def multiGetOpt[V](keys: Seq[Key[Option[V]]], valBufSizes: Seq[Int]): Seq[Option[V]] = - db.multiGetOpt(readOptions, keys, valBufSizes) - - def multiGetInts(keys: Seq[Key[Int]]): Seq[Option[Int]] = - db.multiGetInts(readOptions, keys.map(_.keyBytes)) - - def has[V](key: Key[V]): Boolean = { - val bytes = db.get(readOptions, key.keyBytes) - RocksDBStats.read.recordTagged(key, bytes) - bytes != null - } - - def newIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(true)) - - def newPrefixIterator: RocksIterator = db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true)) - - def iterateOverPrefix(tag: KeyTags.KeyTag)(f: DBEntry => Unit): Unit = iterateOverPrefix(tag.prefixBytes)(f) - - def iterateOverPrefix(prefix: Array[Byte])(f: DBEntry => Unit): Unit = { - @tailrec - def loop(iter: RocksIterator): Unit = { - val key = iter.key() - if (iter.isValid) { - f(Maps.immutableEntry(key, iter.value())) - iter.next() - loop(iter) - } else () - } - - Using.resource(db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true))) { iter => - iter.seek(prefix) - loop(iter) - } - } - - def iterateOver(prefix: Array[Byte], cfh: Option[ColumnFamilyHandle] = None)(f: DBEntry => Unit): Unit = - Using.resource(db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true))) { iter => - iter.seek(prefix) - while (iter.isValid && iter.key().startsWith(prefix)) { - f(Maps.immutableEntry(iter.key(), iter.value())) - iter.next() - } - } - - def iterateFrom(prefix: Array[Byte], first: Array[Byte], cfh: Option[ColumnFamilyHandle] = None)(f: DBEntry => Boolean): Unit = { - Using.resource(db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), readOptions.setTotalOrderSeek(true))) { iter => - iter.seek(first) - while (iter.isValid && iter.key().startsWith(prefix)) { - f(Maps.immutableEntry(iter.key(), iter.value())) - iter.next() - } - } - } - - def prefixExists(prefix: Array[Byte]): Boolean = Using.resource(db.newIterator(readOptions.setTotalOrderSeek(false).setPrefixSameAsStart(true))) { - iter => - iter.seek(prefix) - iter.isValid - } -} diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala deleted file mode 100644 index b05dbdf68e0..00000000000 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/package.scala +++ /dev/null @@ -1,613 +0,0 @@ -package com.wavesplatform.database - -import com.google.common.base.Charsets.UTF_8 -import com.google.common.collect.{Interners, Maps} -import com.google.common.io.ByteStreams.{newDataInput, newDataOutput} -import com.google.common.io.{ByteArrayDataInput, ByteArrayDataOutput} -import com.google.common.primitives.{Bytes, Ints, Longs} -import com.google.protobuf.ByteString -import com.wavesplatform.account.AddressScheme -import com.wavesplatform.common.state.ByteStr -import com.wavesplatform.common.utils.EitherExt2 -import com.wavesplatform.database.protobuf as pb -import com.wavesplatform.database.protobuf.DataEntry.Value -import com.wavesplatform.lang.script.ScriptReader -import com.wavesplatform.protobuf.transaction.PBRecipients -import com.wavesplatform.protobuf.{ByteStrExt, ByteStringExt} -import com.wavesplatform.state.* -import com.wavesplatform.transaction -import com.wavesplatform.transaction.TxPositiveAmount -import com.wavesplatform.utils.* -import monix.eval.Task -import monix.reactive.Observable -import org.rocksdb.* -import sun.nio.ch.Util - -import java.nio.ByteBuffer -import java.util -import java.util.Map as JMap -import scala.annotation.tailrec -import scala.collection.View -import scala.collection.mutable.ArrayBuffer -import scala.jdk.CollectionConverters.* -import scala.util.Using - -//noinspection UnstableApiUsage -package object rocksdb { - final type DBEntry = JMap.Entry[Array[Byte], Array[Byte]] - - implicit class ByteArrayDataOutputExt(val output: ByteArrayDataOutput) extends AnyVal { - def writeByteStr(s: ByteStr): Unit = { - output.write(s.arr) - } - } - - implicit class ByteArrayDataInputExt(val input: ByteArrayDataInput) extends AnyVal { - def readBytes(len: Int): Array[Byte] = { - val arr = new Array[Byte](len) - input.readFully(arr) - arr - } - - def readByteStr(len: Int): ByteStr = { - ByteStr(readBytes(len)) - } - } - - def writeIntSeq(values: Seq[Int]): Array[Byte] = { - values.foldLeft(ByteBuffer.allocate(4 * values.length))(_ putInt _).array() - } - - def readIntSeq(data: Array[Byte]): Seq[Int] = Option(data).fold(Seq.empty[Int]) { d => - val in = ByteBuffer.wrap(data) - Seq.fill(d.length / 4)(in.getInt) - } - - def readAddressIds(data: Array[Byte]): Seq[AddressId] = Option(data).fold(Seq.empty[AddressId]) { d => - require(d.length % java.lang.Long.BYTES == 0, s"Invalid data length: ${d.length}") - val buffer = ByteBuffer.wrap(data) - Seq.fill(d.length / java.lang.Long.BYTES)(AddressId(buffer.getLong)) - } - - def writeAddressIds(values: Seq[AddressId]): Array[Byte] = - values.foldLeft(ByteBuffer.allocate(values.length * java.lang.Long.BYTES)) { case (buf, aid) => buf.putLong(aid.toLong) }.array() - - def readAssetIds(data: Array[Byte]): Seq[ByteStr] = Option(data).fold(Seq.empty[ByteStr]) { d => - require(d.length % transaction.AssetIdLength == 0, s"Invalid data length: ${d.length}") - val buffer = ByteBuffer.wrap(d) - Seq.fill(d.length / transaction.AssetIdLength) { - val idBytes = new Array[Byte](transaction.AssetIdLength) - buffer.get(idBytes) - ByteStr(idBytes) - } - } - - def writeAssetIds(values: Seq[ByteStr]): Array[Byte] = - values.foldLeft(ByteBuffer.allocate(values.length * transaction.AssetIdLength)) { case (buf, ai) => buf.put(ai.arr) }.array() - - def readStrings(data: Array[Byte]): Seq[String] = Option(data).fold(Seq.empty[String]) { _ => - var i = 0 - val s = Seq.newBuilder[String] - - while (i < data.length) { - val len = ((data(i) << 8) | (data(i + 1) & 0xff)).toShort // Optimization - s += new String(data, i + 2, len, UTF_8) - i += (2 + len) - } - s.result() - } - - def writeStrings(strings: Seq[String]): Array[Byte] = { - val utfBytes = strings.toVector.map(_.utf8Bytes) - utfBytes - .foldLeft(ByteBuffer.allocate(utfBytes.map(_.length + 2).sum)) { case (buf, bytes) => - buf.putShort(bytes.length.toShort).put(bytes) - } - .array() - } - - def readLeaseBalanceNode(data: Array[Byte]): LeaseBalanceNode = if (data != null && data.length == 20) - LeaseBalanceNode(Longs.fromByteArray(data.take(8)), Longs.fromByteArray(data.slice(8, 16)), Height(Ints.fromByteArray(data.takeRight(4)))) - else LeaseBalanceNode.Empty - - def writeLeaseBalanceNode(leaseBalanceNode: LeaseBalanceNode): Array[Byte] = - Longs.toByteArray(leaseBalanceNode.in) ++ Longs.toByteArray(leaseBalanceNode.out) ++ Ints.toByteArray(leaseBalanceNode.prevHeight) - - def readLeaseBalance(data: Array[Byte]): CurrentLeaseBalance = if (data != null && data.length == 24) - CurrentLeaseBalance( - Longs.fromByteArray(data.take(8)), - Longs.fromByteArray(data.slice(8, 16)), - Height(Ints.fromByteArray(data.slice(16, 20))), - Height(Ints.fromByteArray(data.takeRight(4))) - ) - else CurrentLeaseBalance.Unavailable - - def writeLeaseBalance(lb: CurrentLeaseBalance): Array[Byte] = - Longs.toByteArray(lb.in) ++ Longs.toByteArray(lb.out) ++ Ints.toByteArray(lb.height) ++ Ints.toByteArray(lb.prevHeight) - - def writeLeaseDetails(lde: Either[Boolean, LeaseDetails]): Array[Byte] = - lde.fold( - _ => throw new IllegalArgumentException("Can not write boolean flag instead of LeaseDetails"), - ld => - pb.LeaseDetails( - ByteString.copyFrom(ld.sender.arr), - Some(PBRecipients.create(ld.recipientAddress)), - ld.amount.value, - ByteString.copyFrom(ld.sourceId.arr), - ld.height, - ld.status match { - case LeaseDetails.Status.Active => pb.LeaseDetails.CancelReason.Empty - case LeaseDetails.Status.Cancelled(height, cancelTxId) => - pb.LeaseDetails.CancelReason - .Cancelled(pb.LeaseDetails.Cancelled(height, cancelTxId.fold(ByteString.EMPTY)(id => ByteString.copyFrom(id.arr)))) - case LeaseDetails.Status.Expired(height) => pb.LeaseDetails.CancelReason.Expired(pb.LeaseDetails.Expired(height)) - } - ).toByteArray - ) - - def readLeaseDetails(data: Array[Byte]): Either[Boolean, LeaseDetails] = - if (data.length == 1) Left(data(0) == 1) - else { - val d = pb.LeaseDetails.parseFrom(data) - Right( - LeaseDetails( - LeaseStaticInfo( - d.senderPublicKey.toPublicKey, - PBRecipients.toAddress(d.recipient.get, AddressScheme.current.chainId).explicitGet(), - TxPositiveAmount.unsafeFrom(d.amount), - d.sourceId.toByteStr, - d.height - ), - d.cancelReason match { - case pb.LeaseDetails.CancelReason.Empty => LeaseDetails.Status.Active - case pb.LeaseDetails.CancelReason.Expired(pb.LeaseDetails.Expired(height, _)) => LeaseDetails.Status.Expired(height) - case pb.LeaseDetails.CancelReason.Cancelled(pb.LeaseDetails.Cancelled(height, transactionId, _)) => - LeaseDetails.Status.Cancelled(height, Some(transactionId.toByteStr).filter(!_.isEmpty)) - } - - ) - ) - } - - def readVolumeAndFeeNode(data: Array[Byte]): VolumeAndFeeNode = if (data != null && data.length == 20) - VolumeAndFeeNode(Longs.fromByteArray(data.take(8)), Longs.fromByteArray(data.slice(8, 16)), Height(Ints.fromByteArray(data.takeRight(4)))) - else VolumeAndFeeNode.Empty - - def writeVolumeAndFeeNode(volumeAndFeeNode: VolumeAndFeeNode): Array[Byte] = - Longs.toByteArray(volumeAndFeeNode.volume) ++ Longs.toByteArray(volumeAndFeeNode.fee) ++ Ints.toByteArray(volumeAndFeeNode.prevHeight) - - def readVolumeAndFee(data: Array[Byte]): CurrentVolumeAndFee = if (data != null && data.length == 24) - CurrentVolumeAndFee( - Longs.fromByteArray(data.take(8)), - Longs.fromByteArray(data.slice(8, 16)), - Height(Ints.fromByteArray(data.slice(16, 20))), - Height(Ints.fromByteArray(data.takeRight(4))) - ) - else CurrentVolumeAndFee.Unavailable - - def writeVolumeAndFee(vf: CurrentVolumeAndFee): Array[Byte] = - Longs.toByteArray(vf.volume) ++ Longs.toByteArray(vf.fee) ++ Ints.toByteArray(vf.height) ++ Ints.toByteArray(vf.prevHeight) - - def readFeatureMap(data: Array[Byte]): Map[Short, Int] = Option(data).fold(Map.empty[Short, Int]) { _ => - val b = ByteBuffer.wrap(data) - val features = Map.newBuilder[Short, Int] - while (b.hasRemaining) { - features += b.getShort -> b.getInt - } - - features.result() - } - - def writeFeatureMap(features: Map[Short, Int]): Array[Byte] = { - val b = ByteBuffer.allocate(features.size * 6) - for ((featureId, height) <- features) - b.putShort(featureId).putInt(height) - - b.array() - } - - def readSponsorship(data: Array[Byte]): SponsorshipValue = { - val ndi = newDataInput(data) - SponsorshipValue(ndi.readLong()) - } - - def writeSponsorship(ai: SponsorshipValue): Array[Byte] = { - val ndo = newDataOutput() - ndo.writeLong(ai.minFee) - ndo.toByteArray - } - - def readAssetDetails(data: Array[Byte]): (AssetInfo, AssetVolumeInfo) = { - - val pbad = pb.AssetDetails.parseFrom(data) - - ( - AssetInfo(pbad.name, pbad.description, Height(pbad.lastRenamedAt)), - AssetVolumeInfo(pbad.reissuable, BigInt(pbad.totalVolume.toByteArray)) - ) - } - - def writeAssetDetails(ai: (AssetInfo, AssetVolumeInfo)): Array[Byte] = { - val (info, volumeInfo) = ai - - pb.AssetDetails( - info.name, - info.description, - info.lastUpdatedAt, - volumeInfo.isReissuable, - ByteString.copyFrom(volumeInfo.volume.toByteArray) - ).toByteArray - } - - def writeBlockMeta(data: pb.BlockMeta): Array[Byte] = data.toByteArray - - def readBlockMeta(bs: Array[Byte]): pb.BlockMeta = pb.BlockMeta.parseFrom(bs) - - def readTransactionHNSeqAndType(bs: Array[Byte]): (Height, Seq[(Byte, TxNum, Int)]) = { - val ndi = newDataInput(bs) - val height = Height(ndi.readInt()) - val numSeqLength = ndi.readInt() - - ( - height, - List.fill(numSeqLength) { - val tp = ndi.readByte() - val num = TxNum(ndi.readShort()) - val size = ndi.readInt() - (tp, num, size) - } - ) - } - - def oldReadTransactionHNSeqAndType(bs: Array[Byte]): (Height, Seq[(Byte, TxNum)]) = { - val ndi = newDataInput(bs) - val height = Height(ndi.readInt()) - val numSeqLength = ndi.readInt() - - ( - height, - List.fill(numSeqLength) { - val tp = ndi.readByte() - val num = TxNum(ndi.readShort()) - (tp, num) - } - ) - } - - def writeTransactionHNSeqAndType(v: (Height, Seq[(Byte, TxNum, Int)])): Array[Byte] = { - val (height, numSeq) = v - val numSeqLength = numSeq.length - - val outputLength = 4 + 4 + numSeqLength * (1 + 2 + 4) - val ndo = newDataOutput(outputLength) - - ndo.writeInt(height) - ndo.writeInt(numSeqLength) - numSeq.foreach { case (tp, num, size) => - ndo.writeByte(tp) - ndo.writeShort(num) - ndo.writeInt(size) - } - - ndo.toByteArray - } - - def oldWriteTransactionHNSeqAndType(v: (Height, Seq[(Byte, TxNum)])): Array[Byte] = { - val (height, numSeq) = v - val numSeqLength = numSeq.length - - val outputLength = 4 + 4 + numSeqLength * (4 + 1) - val ndo = newDataOutput(outputLength) - - ndo.writeInt(height) - ndo.writeInt(numSeqLength) - numSeq.foreach { case (tp, num) => - ndo.writeByte(tp) - ndo.writeShort(num) - } - - ndo.toByteArray - } - - private def readDataEntry(key: String)(bs: Array[Byte]): DataEntry[?] = - pb.DataEntry.parseFrom(bs).value match { - case Value.Empty => EmptyDataEntry(key) - case Value.IntValue(value) => IntegerDataEntry(key, value) - case Value.BoolValue(value) => BooleanDataEntry(key, value) - case Value.BinaryValue(value) => BinaryDataEntry(key, value.toByteStr) - case Value.StringValue(value) => StringDataEntry(key, value) - } - - private def writeDataEntry(e: DataEntry[?]): Array[Byte] = - pb.DataEntry(e match { - case IntegerDataEntry(_, value) => pb.DataEntry.Value.IntValue(value) - case BooleanDataEntry(_, value) => pb.DataEntry.Value.BoolValue(value) - case BinaryDataEntry(_, value) => pb.DataEntry.Value.BinaryValue(ByteString.copyFrom(value.arr)) - case StringDataEntry(_, value) => pb.DataEntry.Value.StringValue(value) - case _: EmptyDataEntry => pb.DataEntry.Value.Empty - }).toByteArray - - def readCurrentData(key: String)(bs: Array[Byte]): CurrentData = if (bs == null) CurrentData.empty(key) - else - CurrentData( - readDataEntry(key)(bs.drop(8)), - Height(Ints.fromByteArray(bs.take(4))), - Height(Ints.fromByteArray(bs.slice(4, 8))) - ) - - def writeCurrentData(cdn: CurrentData): Array[Byte] = - Ints.toByteArray(cdn.height) ++ Ints.toByteArray(cdn.prevHeight) ++ writeDataEntry(cdn.entry) - - def readDataNode(key: String)(bs: Array[Byte]): DataNode = if (bs == null) DataNode.empty(key) - else - DataNode(readDataEntry(key)(bs.drop(4)), Height(Ints.fromByteArray(bs.take(4)))) - - def writeDataNode(dn: DataNode): Array[Byte] = - Ints.toByteArray(dn.prevHeight) ++ writeDataEntry(dn.entry) - - def readCurrentBalance(bs: Array[Byte]): CurrentBalance = if (bs != null && bs.length == 16) - CurrentBalance(Longs.fromByteArray(bs.take(8)), Height(Ints.fromByteArray(bs.slice(8, 12))), Height(Ints.fromByteArray(bs.takeRight(4)))) - else CurrentBalance.Unavailable - - def writeCurrentBalance(balance: CurrentBalance): Array[Byte] = - Longs.toByteArray(balance.balance) ++ Ints.toByteArray(balance.height) ++ Ints.toByteArray(balance.prevHeight) - - def readBalanceNode(bs: Array[Byte]): BalanceNode = if (bs != null && bs.length == 12) - BalanceNode(Longs.fromByteArray(bs.take(8)), Height(Ints.fromByteArray(bs.takeRight(4)))) - else BalanceNode.Empty - - def writeBalanceNode(balance: BalanceNode): Array[Byte] = - Longs.toByteArray(balance.balance) ++ Ints.toByteArray(balance.prevHeight) - - implicit class DBExt(val db: RocksDB) extends AnyVal { - - def readOnly[A](f: ReadOnlyDB => A): A = { - Using.resource(db.getSnapshot) { s => - Using.resource(new ReadOptions().setSnapshot(s).setVerifyChecksums(false)) { ro => - f(new ReadOnlyDB(db, ro)) - } - }((resource: Snapshot) => db.releaseSnapshot(resource)) - } - - /** @note - * Runs operations in batch, so keep in mind, that previous changes don't appear lately in f - */ - def readWrite[A](f: RW => A): A = { - Using.resource(db.getSnapshot) { s => - Using.resource(new ReadOptions().setSnapshot(s).setVerifyChecksums(false)) { ro => - Using.resource(new WriteOptions().setSync(false).setDisableWAL(true)) { wo => - Using.resource(new WriteBatch()) { wb => - val r = f(new RW(db, ro, wb)) - db.write(wo, wb) - r - } - } - } - } { (resource: Snapshot) => - db.releaseSnapshot(resource) - resource.close() - } - } - - def multiGetOpt[A](readOptions: ReadOptions, keys: Seq[Key[Option[A]]], valBufSize: Int): Seq[Option[A]] = - multiGetOpt(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(keys.size, valBufSize)) - - def multiGetOpt[A](readOptions: ReadOptions, keys: Seq[Key[Option[A]]], valBufSizes: Seq[Int]): Seq[Option[A]] = - multiGetOpt(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(valBufSizes)) - - def multiGet[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[A]], valBufSizes: ArrayBuffer[Int]): View[A] = - multiGet(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(valBufSizes)) - - def multiGet[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[A]], valBufSize: Int): View[A] = - multiGet(readOptions, keys, getKeyBuffersFromKeys(keys), getValueBuffers(keys.size, valBufSize)) - - def multiGet[A](readOptions: ReadOptions, keys: Seq[Key[A]], valBufSize: Int): Seq[Option[A]] = { - val keyBufs = getKeyBuffersFromKeys(keys) - val valBufs = getValueBuffers(keys.size, valBufSize) - - val result = keys.view - .zip(db.multiGetByteBuffers(readOptions, keyBufs, valBufs).asScala) - .map { case (parser, value) => - if (value.status.getCode == Status.Code.Ok) { - val arr = new Array[Byte](value.requiredSize) - value.value.get(arr) - Util.releaseTemporaryDirectBuffer(value.value) - Some(parser.parse(arr)) - } else None - } - .toSeq - - keyBufs.forEach(Util.releaseTemporaryDirectBuffer(_)) - result - } - - def multiGetInts(readOptions: ReadOptions, keys: Seq[Array[Byte]]): Seq[Option[Int]] = { - val keyBufs = getKeyBuffers(keys) - val valBufs = getValueBuffers(keys.size, 4) - - val result = db - .multiGetByteBuffers(readOptions, keyBufs, valBufs) - .asScala - .map { value => - if (value.status.getCode == Status.Code.Ok) { - val h = Some(value.value.getInt) - Util.releaseTemporaryDirectBuffer(value.value) - h - } else None - } - .toSeq - - keyBufs.forEach(Util.releaseTemporaryDirectBuffer(_)) - result - } - - def multiGetFlat[A](readOptions: ReadOptions, keys: ArrayBuffer[Key[Option[A]]], valBufSizes: ArrayBuffer[Int]): Seq[A] = { - val keyBufs = getKeyBuffersFromKeys(keys) - val valBufs = getValueBuffers(valBufSizes) - - val result = keys.view - .zip(db.multiGetByteBuffers(readOptions, keyBufs, valBufs).asScala) - .flatMap { case (parser, value) => - if (value.status.getCode == Status.Code.Ok) { - val arr = new Array[Byte](value.requiredSize) - value.value.get(arr) - Util.releaseTemporaryDirectBuffer(value.value) - parser.parse(arr) - } else None - } - .toSeq - - keyBufs.forEach(Util.releaseTemporaryDirectBuffer(_)) - result - } - - def get[A](key: Key[A]): A = key.parse(db.get(key.columnFamilyHandle.getOrElse(db.getDefaultColumnFamily), key.keyBytes)) - def get[A](key: Key[A], readOptions: ReadOptions): A = key.parse(db.get(readOptions, key.keyBytes)) - def has(key: Key[?]): Boolean = db.get(key.keyBytes) != null - - def iterateOver(tag: KeyTags.KeyTag)(f: DBEntry => Unit): Unit = iterateOver(tag.prefixBytes)(f) - - def iterateOver(prefix: Array[Byte], seekPrefix: Array[Byte] = Array.emptyByteArray, cfh: Option[ColumnFamilyHandle] = None)( - f: DBEntry => Unit - ): Unit = { - @tailrec - def loop(iter: RocksIterator): Unit = { - if (iter.isValid && iter.key().startsWith(prefix)) { - f(Maps.immutableEntry(iter.key(), iter.value())) - iter.next() - loop(iter) - } else () - } - - val iterator = db.newIterator(cfh.getOrElse(db.getDefaultColumnFamily), new ReadOptions().setTotalOrderSeek(true)) - try { - iterator.seek(Bytes.concat(prefix, seekPrefix)) - loop(iterator) - } finally iterator.close() - } - - def resourceObservable: Observable[DBResource] = Observable.resource(Task(DBResource(db)))(r => Task(r.close())) - - def withResource[A](f: DBResource => A): A = { - val resource = DBResource(db) - try f(resource) - finally resource.close() - } - - private def getKeyBuffersFromKeys(keys: collection.Seq[Key[?]]): util.List[ByteBuffer] = - keys.map { k => - val arr = k.keyBytes - val b = Util.getTemporaryDirectBuffer(arr.length) - b.put(k.keyBytes).flip() - b - }.asJava - - private def getKeyBuffers(keys: collection.Seq[Array[Byte]]): util.List[ByteBuffer] = - keys.map { k => - val b = Util.getTemporaryDirectBuffer(k.length) - b.put(k).flip() - b - }.asJava - - private def getValueBuffers(amount: Int, bufferSize: Int): util.List[ByteBuffer] = - List - .fill(amount) { - val buf = Util.getTemporaryDirectBuffer(bufferSize) - buf.limit(buf.capacity()) - buf - } - .asJava - - private def getValueBuffers(bufferSizes: collection.Seq[Int]): util.List[ByteBuffer] = - bufferSizes.map { size => - val buf = Util.getTemporaryDirectBuffer(size) - buf.limit(buf.capacity()) - buf - }.asJava - - private def multiGetOpt[A]( - readOptions: ReadOptions, - keys: Seq[Key[Option[A]]], - keyBufs: util.List[ByteBuffer], - valBufs: util.List[ByteBuffer] - ): Seq[Option[A]] = { - val result = keys.view - .zip(db.multiGetByteBuffers(readOptions, keyBufs, valBufs).asScala) - .map { case (parser, value) => - if (value.status.getCode == Status.Code.Ok) { - val arr = new Array[Byte](value.requiredSize) - value.value.get(arr) - Util.releaseTemporaryDirectBuffer(value.value) - parser.parse(arr) - } else None - } - .toSeq - - keyBufs.forEach(Util.releaseTemporaryDirectBuffer(_)) - result - } - - private def multiGet[A]( - readOptions: ReadOptions, - keys: ArrayBuffer[Key[A]], - keyBufs: util.List[ByteBuffer], - valBufs: util.List[ByteBuffer] - ): View[A] = { - val result = keys.view - .zip(db.multiGetByteBuffers(readOptions, keyBufs, valBufs).asScala) - .flatMap { case (parser, value) => - if (value.status.getCode == Status.Code.Ok) { - val arr = new Array[Byte](value.requiredSize) - value.value.get(arr) - Util.releaseTemporaryDirectBuffer(value.value) - Some(parser.parse(arr)) - } else None - } - - keyBufs.forEach(Util.releaseTemporaryDirectBuffer(_)) - result - } - } - - def writeAssetScript(script: AssetScriptInfo): Array[Byte] = - Longs.toByteArray(script.complexity) ++ script.script.bytes().arr - - def readAssetScript(b: Array[Byte]): AssetScriptInfo = - AssetScriptInfo(ScriptReader.fromBytes(b.drop(8)).explicitGet(), Longs.fromByteArray(b)) - - def writeAccountScriptInfo(scriptInfo: AccountScriptInfo): Array[Byte] = - pb.AccountScriptInfo.toByteArray( - pb.AccountScriptInfo( - scriptInfo.publicKey.toByteString, - scriptInfo.script.bytes().toByteString, - scriptInfo.verifierComplexity, - scriptInfo.complexitiesByEstimator.map { case (version, complexities) => - pb.AccountScriptInfo.ComplexityByVersion(version, complexities) - }.toSeq - ) - ) - - private val scriptInterner = Interners.newWeakInterner[AccountScriptInfo]() - - def readAccountScriptInfo(b: Array[Byte]): AccountScriptInfo = { - val asi = pb.AccountScriptInfo.parseFrom(b) - scriptInterner.intern( - AccountScriptInfo( - asi.publicKey.toPublicKey, - ScriptReader.fromBytes(asi.scriptBytes.toByteArray).explicitGet(), - asi.maxComplexity, - asi.callableComplexity.map { c => - c.version -> c.callableComplexity - }.toMap - ) - ) - } - - implicit final class Ops(private val value: AddressId) extends AnyVal { - def toByteArray: Array[Byte] = Longs.toByteArray(AddressId.raw(value)) - } - - implicit class LongExt(val l: Long) extends AnyVal { - def toByteArray: Array[Byte] = Longs.toByteArray(l) - } -} diff --git a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/stats/RocksDBStats.scala b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/stats/RocksDBStats.scala index 6504c914a26..897bfcc421b 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/stats/RocksDBStats.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/database/rocksdb/stats/RocksDBStats.scala @@ -1,6 +1,6 @@ package com.wavesplatform.database.rocksdb.stats -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import kamon.Kamon import kamon.metric.{MeasurementUnit, Metric} diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala index 5efbdbe2a4e..57639c12db5 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/DefaultDiskCaches.scala @@ -6,7 +6,7 @@ import com.wavesplatform.account.{Address, Alias} import com.wavesplatform.blockchain.SignedBlockHeaderWithVrf import com.wavesplatform.collections.syntax.* import com.wavesplatform.database.AddressId -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.* import com.wavesplatform.ride.runner.caches.* import com.wavesplatform.ride.runner.db.{ReadOnly, ReadWrite, RideDbAccess} import com.wavesplatform.ride.runner.stats.KamonCaffeineStats diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala index 77c059716f4..a6645cc536c 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/caches/disk/KvPairs.scala @@ -9,8 +9,7 @@ import com.wavesplatform.block.SignedBlockHeader import com.wavesplatform.blockchain.SignedBlockHeaderWithVrf import com.wavesplatform.common.utils.EitherExt2 import com.wavesplatform.database.protobuf.{StaticAssetInfo, BlockMeta as PBBlockMeta} -import com.wavesplatform.database.rocksdb.{Key, readAccountScriptInfo, readAssetDetails, readAssetScript, readBlockMeta, writeAccountScriptInfo, writeAssetDetails, writeAssetScript, writeBlockMeta} -import com.wavesplatform.database.{AddressId, toVanillaTransaction, protobuf as pb} +import com.wavesplatform.database.{protobuf as pb, *} import com.wavesplatform.protobuf.block.PBBlocks import com.wavesplatform.protobuf.transaction.PBTransactions import com.wavesplatform.protobuf.{ByteStrExt, ByteStringExt} diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadOnly.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadOnly.scala index d96062910e1..4313528c2ca 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadOnly.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadOnly.scala @@ -2,7 +2,7 @@ package com.wavesplatform.ride.runner.db import com.google.common.collect.Maps import com.wavesplatform.database.DBEntry -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import com.wavesplatform.database.rocksdb.stats.RocksDBStats import com.wavesplatform.database.rocksdb.stats.RocksDBStats.DbHistogramExt import org.rocksdb.* diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadWrite.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadWrite.scala index dbe0074e7a5..f275e51942d 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadWrite.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/BatchedReadWrite.scala @@ -1,6 +1,6 @@ package com.wavesplatform.ride.runner.db -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import com.wavesplatform.database.rocksdb.stats.RocksDBStats import com.wavesplatform.database.rocksdb.stats.RocksDBStats.DbHistogramExt import com.wavesplatform.utils.OptimisticLockable diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadOnly.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadOnly.scala index 2111e164b4c..f33ce309245 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadOnly.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadOnly.scala @@ -2,7 +2,7 @@ package com.wavesplatform.ride.runner.db import com.google.common.collect.Maps import com.wavesplatform.database.DBEntry -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import com.wavesplatform.database.rocksdb.stats.RocksDBStats import com.wavesplatform.database.rocksdb.stats.RocksDBStats.DbHistogramExt import com.wavesplatform.ride.runner.caches.disk.KvPair diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadWrite.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadWrite.scala index 41b88e15623..d06d68aec69 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadWrite.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/DirectReadWrite.scala @@ -1,6 +1,6 @@ package com.wavesplatform.ride.runner.db -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import com.wavesplatform.database.rocksdb.stats.RocksDBStats import com.wavesplatform.database.rocksdb.stats.RocksDBStats.DbHistogramExt import com.wavesplatform.utils.OptimisticLockable diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/HasDb.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/HasDb.scala index c084a9e5be0..ac0c56b9384 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/HasDb.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/HasDb.scala @@ -1,6 +1,6 @@ package com.wavesplatform.ride.runner.db -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import org.rocksdb.{ColumnFamilyHandle, RocksDB} trait HasDb { diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala index 1d05406b485..d3b5c1b692f 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadOnly.scala @@ -1,7 +1,7 @@ package com.wavesplatform.ride.runner.db import com.wavesplatform.database.DBEntry -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import com.wavesplatform.ride.runner.caches.RemoteData import com.wavesplatform.ride.runner.caches.disk.{KvHistoryPair, KvPair} import com.wavesplatform.state.Height diff --git a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala index 93adc319f9b..3c6d7c30b65 100644 --- a/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala +++ b/ride-runner/src/main/scala/com/wavesplatform/ride/runner/db/ReadWrite.scala @@ -1,7 +1,7 @@ package com.wavesplatform.ride.runner.db import com.google.common.primitives.Ints -import com.wavesplatform.database.rocksdb.Key +import com.wavesplatform.database.Key import com.wavesplatform.ride.runner.caches.RemoteData import com.wavesplatform.ride.runner.caches.disk.KvHistoryPair import com.wavesplatform.ride.runner.db.Heights.{splitHeightsAt, splitHeightsAtRollback} diff --git a/ride-runner/src/test/scala/com/wavesplatform/ride/runner/entrypoints/RideRunnerWithPreparedStateTestSuite.scala b/ride-runner/src/test/scala/com/wavesplatform/ride/runner/entrypoints/RideRunnerWithPreparedStateTestSuite.scala index 1aea2854736..360a8b2f566 100644 --- a/ride-runner/src/test/scala/com/wavesplatform/ride/runner/entrypoints/RideRunnerWithPreparedStateTestSuite.scala +++ b/ride-runner/src/test/scala/com/wavesplatform/ride/runner/entrypoints/RideRunnerWithPreparedStateTestSuite.scala @@ -10,6 +10,7 @@ class RideRunnerWithPreparedStateTestSuite extends BaseTestSuite with HasTestAcc val sampleInput = ConfigFactory.parseResources("sample-input.conf") val input = RideRunnerInputParser.from(RideRunnerInputParser.prepare(sampleInput)) val r = WavesRideRunnerWithPreparedStateApp.run(input) + println(r) (r \ "result" \ "value" \ "_2" \ "value").validate[BigInt] shouldBe JsSuccess(BigInt("9007199361531057")) } }