Skip to content

Commit

Permalink
Stability improvements (wavesplatform#3938)
Browse files Browse the repository at this point in the history
  • Loading branch information
phearnot authored Feb 5, 2024
1 parent 993c7c2 commit c94bf22
Show file tree
Hide file tree
Showing 40 changed files with 457 additions and 256 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/publish-docker-node.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ jobs:

- name: Build sources
run: |
sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt buildTarballsForDocker
sbt --mem 2048 -J-XX:+UseG1GC -Dcoursier.cache=~/.cache/coursier -Dsbt.boot.directory=~/.sbt ;buildTarballsForDocker;buildRIDERunnerForDocker
- name: Setup Docker buildx
uses: docker/setup-buildx-action@v2
Expand Down
13 changes: 8 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ lazy val repl = crossProject(JSPlatform, JVMPlatform)
libraryDependencies ++=
Dependencies.protobuf.value ++
Dependencies.langCompilerPlugins.value ++
Dependencies.circe.value ++
Seq(
"org.scala-js" %%% "scala-js-macrotask-executor" % "1.0.0"
),
Dependencies.circe.value,
inConfig(Compile)(
Seq(
PB.targets += scalapb.gen(flatPackage = true) -> sourceManaged.value,
Expand All @@ -109,6 +106,9 @@ lazy val `repl-jvm` = repl.jvm
)

lazy val `repl-js` = repl.js.dependsOn(`lang-js`)
.settings(
libraryDependencies += "org.scala-js" %%% "scala-js-macrotask-executor" % "1.1.1"
)

lazy val `curve25519-test` = project.dependsOn(node)

Expand Down Expand Up @@ -198,6 +198,10 @@ buildTarballsForDocker := {
(`grpc-server` / Universal / packageZipTarball).value,
baseDirectory.value / "docker" / "target" / "waves-grpc-server.tgz"
)
}

lazy val buildRIDERunnerForDocker = taskKey[Unit]("Package RIDE Runner tarball and copy it to docker/target")
buildRIDERunnerForDocker := {
IO.copyFile(
(`ride-runner` / Universal / packageZipTarball).value,
(`ride-runner` / baseDirectory).value / "docker" / "target" / s"${(`ride-runner` / name).value}.tgz"
Expand Down Expand Up @@ -244,7 +248,6 @@ lazy val buildDebPackages = taskKey[Unit]("Build debian packages")
buildDebPackages := {
(`grpc-server` / Debian / packageBin).value
(node / Debian / packageBin).value
(`ride-runner` / Debian / packageBin).value
}

def buildPackages: Command = Command("buildPackages")(_ => Network.networkParser) { (state, args) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.wavesplatform.events

import com.wavesplatform.block.{Block, MicroBlock}
import com.wavesplatform.common.state.ByteStr
import com.wavesplatform.database.RDB
import com.wavesplatform.events.api.grpc.protobuf.BlockchainUpdatesApiGrpc
import com.wavesplatform.events.settings.BlockchainUpdatesSettings
import com.wavesplatform.extensions.{Context, Extension}
Expand All @@ -14,6 +13,7 @@ import io.grpc.{Metadata, Server, ServerStreamTracer, Status}
import monix.execution.schedulers.SchedulerService
import monix.execution.{ExecutionModel, Scheduler, UncaughtExceptionReporter}
import net.ceedubs.ficus.Ficus.*
import org.rocksdb.RocksDB

import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
Expand All @@ -31,9 +31,8 @@ class BlockchainUpdates(private val context: Context) extends Extension with Sco
)

private[this] val settings = context.settings.config.as[BlockchainUpdatesSettings]("waves.blockchain-updates")
// todo: no need to open column families here
private[this] val rdb = RDB.open(context.settings.dbSettings.copy(directory = context.settings.directory + "/blockchain-updates"))
private[this] val repo = new Repo(rdb.db, context.blocksApi)
private[this] val rdb = RocksDB.open(context.settings.directory + "/blockchain-updates")
private[this] val repo = new Repo(rdb, context.blocksApi)

private[this] val grpcServer: Server = NettyServerBuilder
.forAddress(new InetSocketAddress("0.0.0.0", settings.grpcPort))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ object Curve25519 {
def sign(privateKey: Array[Byte], message: Array[Byte]): Array[Byte] =
provider.calculateSignature(provider.getRandom(SignatureLength), privateKey, message)

def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean = provider.verifySignature(publicKey, message, signature)

def verify(signature: Array[Byte], message: Array[Byte], publicKey: Array[Byte]): Boolean =
signature != null && signature.length == SignatureLength &&
publicKey != null && publicKey.length == KeyLength &&
message != null &&
provider.verifySignature(publicKey, message, signature)
}
14 changes: 0 additions & 14 deletions lang/testkit/src/test/resources/logback-test.xml

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.wavesplatform.report

import com.wavesplatform.report.QaseReporter.{CaseIdPattern, QaseProjects, TestResult}
import io.qase.api.QaseClient
import io.qase.api.config.QaseConfig
import io.qase.api.utils.IntegrationUtils
import io.qase.client.model.ResultCreate
import org.aeonbits.owner.ConfigFactory
import org.scalatest.Reporter
import org.scalatest.events.*
import play.api.libs.json.{Format, Json}
Expand Down Expand Up @@ -45,7 +46,7 @@ class QaseReporter extends Reporter {
msgOpt: Option[String],
duration: Option[Long]
): Unit =
if (QaseClient.isEnabled) {
if (QaseReporter.isEnabled) {
val errMsg = msgOpt.map(msg => s"\n\n**Error**\n$msg").getOrElse("")
val comment = s"$testName$errMsg"
val stacktrace = throwable.map(IntegrationUtils.getStacktrace)
Expand All @@ -55,7 +56,7 @@ class QaseReporter extends Reporter {
}

private def saveRunResults(): Unit =
if (QaseClient.isEnabled) {
if (QaseReporter.isEnabled) {
results.asScala.foreach { case (projectCode, results) =>
if (results.nonEmpty) {
val writer = new FileWriter(s"./$projectCode-${System.currentTimeMillis()}")
Expand All @@ -73,6 +74,10 @@ class QaseReporter extends Reporter {
}

object QaseReporter {
// this hack prevents QaseClient class from being initialized, which in turn initializes Logback with malformed config
// and prints a warning about unused appender to stdout
private[QaseReporter] val isEnabled = ConfigFactory.create(classOf[QaseConfig]).isEnabled

val RunIdKeyPrefix = "QASE_RUN_ID_"
val CheckPRRunIdKey = "CHECKPR_RUN_ID"
val QaseProjects = Seq("NODE", "RIDE", "BU", "SAPI")
Expand Down
14 changes: 0 additions & 14 deletions lang/tests/src/test/resources/logback-test.xml

This file was deleted.

4 changes: 2 additions & 2 deletions node-it/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ inTask(docker)(
)
)

val packageAll = taskKey[Unit]("build all packages")
docker := docker.dependsOn(LocalProject("waves-node") / packageAll).value
val buildTarballsForDocker = taskKey[Unit]("build all packages")
docker := docker.dependsOn(LocalProject("waves-node") / buildTarballsForDocker).value
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {

private val settings: WavesSettings = wavesSettings.copy(minerSettings = wavesSettings.minerSettings.copy(quorum = 0))

def generateDb(genBlocks: Seq[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit =
def generateDb(genBlocks: Iterator[GenBlock], dbDirPath: String = settings.dbSettings.directory): Unit =
generateBlockchain(genBlocks, settings.dbSettings.copy(directory = dbDirPath))

def generateBinaryFile(genBlocks: Seq[GenBlock]): Unit = {
def generateBinaryFile(genBlocks: Iterator[GenBlock]): Unit = {
val targetHeight = genBlocks.size + 1
log.info(s"Exporting to $targetHeight")
val outputFilename = s"blockchain-$targetHeight"
Expand All @@ -94,7 +94,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
}
}

private def generateBlockchain(genBlocks: Seq[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = {
private def generateBlockchain(genBlocks: Iterator[GenBlock], dbSettings: DBSettings, exportToFile: Block => Unit = _ => ()): Unit = {
val scheduler = Schedulers.singleThread("appender")
val time = new Time {
val startTime: Long = settings.blockchainSettings.genesisSettings.timestamp
Expand Down Expand Up @@ -185,7 +185,7 @@ class BlockchainGenerator(wavesSettings: WavesSettings) extends ScorexLogging {
}
case Left(err) => log.error(s"Error appending block: $err")
}
}
}.get
}

private def correctTxTimestamp(genTx: GenTx, time: Time): Transaction =
Expand Down
6 changes: 3 additions & 3 deletions node/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ waves {
max-cache-size = 100000

max-rollback-depth = 2000
remember-blocks = 3h

# Delete old history entries (Data, WAVES and Asset balances) in this interval before a safe rollback height.
# Comment to disable.
Expand All @@ -40,15 +39,16 @@ waves {
# AA Asset balance history for address.
# cleanup-interval = 500 # Optimal for Xmx2G

use-bloom-filter = false

rocksdb {
main-cache-size = 512M
tx-cache-size = 16M
tx-meta-cache-size = 16M
tx-snapshot-cache-size = 16M
api-cache-size=16M
write-buffer-size = 128M
enable-statistics = false
# When enabled, after writing every SST file of the default column family, reopen it and read all the keys.
paranoid-checks = off
}
}

Expand Down
86 changes: 86 additions & 0 deletions node/src/main/scala/com/wavesplatform/Explorer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,92 @@ object Explorer extends ScorexLogging {
log.info(s"Load meta for $id")
val meta = rdb.db.get(Keys.transactionMetaById(TransactionId(ByteStr.decodeBase58(id).get), rdb.txMetaHandle))
log.info(s"Meta: $meta")
case "DH" =>
val address = Address.fromString(argument(1, "address")).explicitGet()
val key = argument(2, "key")
val requestedHeight = argument(3, "height").toInt
log.info(s"Loading address ID for $address")
val addressId = rdb.db.get(Keys.addressId(address)).get
log.info(s"Collecting data history for key $key on $address ($addressId)")
val currentEntry = rdb.db.get(Keys.data(addressId, key))
log.info(s"Current entry: $currentEntry")
val problematicEntry = rdb.db.get(Keys.dataAt(addressId, key)(requestedHeight))
log.info(s"Entry at $requestedHeight: $problematicEntry")
case "DHC" =>
log.info("Looking for data entry history corruptions")
var thisAddressId = 0L
var prevHeight = 0
var key = ""
var addressCount = 0
rdb.db.iterateOver(KeyTags.DataHistory.prefixBytes, None) { e =>
val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10))
val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
val keyFromKey = new String(e.getKey.drop(10).dropRight(4), "utf-8")
if (addressIdFromKey != thisAddressId) {
thisAddressId = addressIdFromKey
key = keyFromKey
addressCount += 1
} else if (key != keyFromKey) {
key = keyFromKey
} else {
val node = readDataNode(key)(e.getValue)
if (node.prevHeight != prevHeight) {
val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
log.warn(s"$address/$key@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")

}
}
prevHeight = heightFromKey
}
log.info(s"Checked $addressCount addresses")
case "ABHC" =>
log.info("Looking for asset balance history corruptions")
var thisAddressId = 0L
var prevHeight = 0
var key = IssuedAsset(ByteStr(new Array[Byte](32)))
var addressCount = 0
rdb.db.iterateOver(KeyTags.AssetBalanceHistory.prefixBytes, None) { e =>
val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(34, 42))
val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
val keyFromKey = IssuedAsset(ByteStr(e.getKey.slice(2, 34)))
if (keyFromKey != key) {
thisAddressId = addressIdFromKey
key = keyFromKey
addressCount += 1
} else if (thisAddressId != addressIdFromKey) {
thisAddressId = addressIdFromKey
} else {
val node = readBalanceNode(e.getValue)
if (node.prevHeight != prevHeight) {
val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
log.warn(s"$key/$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")

}
}
prevHeight = heightFromKey
}
log.info(s"Checked $addressCount assets")
case "BHC" =>
log.info("Looking for balance history corruptions")
var thisAddressId = 0L
var prevHeight = 0
var addressCount = 0
rdb.db.iterateOver(KeyTags.WavesBalanceHistory.prefixBytes, None) { e =>
val addressIdFromKey = Longs.fromByteArray(e.getKey.slice(2, 10))
val heightFromKey = Ints.fromByteArray(e.getKey.takeRight(4))
if (addressIdFromKey != thisAddressId) {
thisAddressId = addressIdFromKey
addressCount += 1
} else {
val node = readBalanceNode(e.getValue)
if (node.prevHeight != prevHeight) {
val address = rdb.db.get(Keys.idToAddress(AddressId(thisAddressId)))
log.warn(s"$address@$heightFromKey: node.prevHeight=${node.prevHeight}, actual=$prevHeight")
}
}
prevHeight = heightFromKey
}
log.info(s"Checked $addressCount addresses")
}
} finally {
reader.close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ object GenesisBlockGenerator {
.headOption
.map(new File(_).getAbsoluteFile.ensuring(f => !f.isDirectory && f.getParentFile.isDirectory || f.getParentFile.mkdirs()))

val settings = parseSettings(ConfigFactory.parseFile(inputConfFile))
val settings = parseSettings(ConfigFactory.parseFile(inputConfFile).resolve())
val confBody = createConfig(settings)
outputConfFile.foreach(ocf => Files.write(ocf.toPath, confBody.utf8Bytes))
}
Expand Down
15 changes: 9 additions & 6 deletions node/src/main/scala/com/wavesplatform/Importer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import akka.actor.ActorSystem
import cats.implicits.catsSyntaxOption
import cats.syntax.apply.*
import com.google.common.io.ByteStreams
import com.google.common.primitives.Ints
import com.google.common.primitives.{Ints, Longs}
import com.wavesplatform.Exporter.Formats
import com.wavesplatform.api.common.{CommonAccountsApi, CommonAssetsApi, CommonBlocksApi, CommonTransactionsApi}
import com.wavesplatform.block.{Block, BlockHeader}
Expand Down Expand Up @@ -217,6 +217,8 @@ object Importer extends ScorexLogging {
val maxSize = importOptions.maxQueueSize
val queue = new mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])](maxSize)

val CurrentTS = System.currentTimeMillis()

@tailrec
def readBlocks(queue: mutable.Queue[(VanillaBlock, Option[BlockSnapshotResponse])], remainCount: Int, maxCount: Int): Unit = {
if (remainCount == 0) ()
Expand Down Expand Up @@ -247,11 +249,12 @@ object Importer extends ScorexLogging {
if (blocksToSkip > 0) {
blocksToSkip -= 1
} else {
val blockV5 = blockchain.isFeatureActivated(BlockchainFeatures.BlockV5, blockchain.height + (maxCount - remainCount) + 1)
val rideV6 = blockchain.isFeatureActivated(BlockchainFeatures.RideV6, blockchain.height + (maxCount - remainCount) + 1)
lazy val parsedProtoBlock = PBBlocks.vanilla(PBBlocks.addChainId(protobuf.block.PBBlock.parseFrom(blockBytes)), unsafe = true)

val block = (if (!blockV5) Block.parseBytes(blockBytes) else parsedProtoBlock).orElse(parsedProtoBlock).get
val block = (if (1 < blockBytes.head && blockBytes.head < 5 && Longs.fromByteArray(blockBytes.slice(1, 9)) < CurrentTS)
Block.parseBytes(blockBytes).orElse(parsedProtoBlock)
else
parsedProtoBlock).get
val blockSnapshot = snapshotsBytes.map { bytes =>
BlockSnapshotResponse(
block.id(),
Expand Down Expand Up @@ -308,7 +311,7 @@ object Importer extends ScorexLogging {
case _ =>
counter = counter + 1
}
} else {
} else if (!quit){
log.warn(s"Block $block is not a child of the last block ${blockchain.lastBlockId.get}")
}
}
Expand Down Expand Up @@ -360,7 +363,7 @@ object Importer extends ScorexLogging {
val blocksFileOffset =
importOptions.format match {
case Formats.Binary =>
var blocksOffset = 0
var blocksOffset = 0L
rdb.db.iterateOver(KeyTags.BlockInfoAtHeight) { e =>
e.getKey match {
case Array(_, _, 0, 0, 0, 1) => // Skip genesis
Expand Down
Loading

0 comments on commit c94bf22

Please sign in to comment.