Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: abci state sync #2413

Open
wants to merge 2 commits into
base: v1.8-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,475 changes: 812 additions & 663 deletions Cargo.lock

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ transport = "routed"
#]
grpc-concurrency = [
{ "check_tx" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "list_snapshots" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "load_snapshot_chunk" = {{= it.platform.drive.tenderdash.mempool.maxConcurrentCheckTx }} },
{ "offer_snapshot" = 1 },
{ "apply_snapshot_chunk" = 1 },
]


Expand Down Expand Up @@ -414,7 +418,7 @@ ttl-num-blocks = {{=it.platform.drive.tenderdash.mempool.ttlNumBlocks}}
# the network to take and serve state machine snapshots. State sync is not attempted if the node
# has any local state (LastBlockHeight > 0). The node will have a truncated block history,
# starting from the height of the snapshot.
enable = false
enable = true

# State sync uses light client verification to verify state. This can be done either through the
# P2P layer or RPC layer. Set this to true to use the P2P layer. If false (default), RPC layer
Expand Down
6 changes: 6 additions & 0 deletions packages/rs-drive-abci/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ ABCI_LOG_STDOUT_FORMAT=pretty
ABCI_LOG_STDOUT_COLOR=true

DB_PATH=/tmp/db

CHECKPOINTS_PATH=${DB_PATH}/checkpoints
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

⚠️ Critical: Production environments configured to use temporary storage

The codebase has a proper default production path (/var/lib/dash-platform/data), but it's being overridden by environment files to use temporary storage:

  • .env.mainnet: DB_PATH=/tmp/db
  • .env.testnet: DB_PATH=/tmp/db

This configuration will lead to data loss on system reboot. Update the environment files to use persistent storage paths instead of /tmp/db.

🔗 Analysis chain

Verify database paths for production environments.

Both CHECKPOINTS_PATH and GROVEDB_LATEST_FILE inherit from DB_PATH which is set to /tmp/db. While this is fine for local development, using /tmp in production could lead to data loss as it's typically cleared on system reboot.

Run this script to check if there are any production environment files with similar configuration:

Also applies to: 19-19

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for production environment files and their DB_PATH configuration
# to ensure they don't use temporary directories.

echo "Searching for environment files:"
fd -t f "^\.env\.(production|prod)" packages/rs-drive-abci/

echo -e "\nChecking DB_PATH in found files:"
rg "^DB_PATH=" packages/rs-drive-abci/.env.*

Length of output: 702


Script:

#!/bin/bash
# Check for deployment documentation and configuration
echo "Searching for deployment-related files and documentation:"
fd -t f "^(deploy|docker|k8s|kubernetes|README)" packages/rs-drive-abci/

echo -e "\nChecking content of found files for database path configuration:"
rg -i "(\bdb[_\s-]path|database[_\s-]path|data[_\s-]dir)" packages/rs-drive-abci/

Length of output: 2638


# GroveDB database file
GROVEDB_LATEST_FILE=${DB_PATH}/latest_state

REJECTIONS_PATH=/tmp/rejected

# Cache size for Data Contracts
Expand Down
91 changes: 79 additions & 12 deletions packages/rs-drive-abci/src/abci/app/consensus.rs
Original file line number Diff line number Diff line change
@@ -1,61 +1,100 @@
use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication};
use crate::abci::app::{
BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication,
SnapshotManagerApplication, TransactionalApplication,
};
use crate::abci::handler;
use crate::abci::handler::error::error_into_exception;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::platform::Platform;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::CoreRPCLike;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;
use std::fmt::Debug;
use std::sync::RwLock;
use tenderdash_abci::proto::abci as proto;
use dapi_grpc::tonic;

Check warning on line 18 in packages/rs-drive-abci/src/abci/app/consensus.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

unused import: `dapi_grpc::tonic`

warning: unused import: `dapi_grpc::tonic` --> packages/rs-drive-abci/src/abci/app/consensus.rs:18:5 | 18 | use dapi_grpc::tonic; | ^^^^^^^^^^^^^^^^ | = note: `#[warn(unused_imports)]` on by default

/// AbciApp is an implementation of ABCI Application, as defined by Tenderdash.
///
/// AbciApp implements logic that should be triggered when Tenderdash performs various operations, like
/// creating new proposal or finalizing new block.
pub struct ConsensusAbciApplication<'a, C> {
/// 'p: 'tx, means that Platform must outlive the transaction
pub struct ConsensusAbciApplication<'p, C> {
/// Platform
platform: &'a Platform<C>,
platform: &'p Platform<C>,
/// The current GroveDb transaction
transaction: RwLock<Option<Transaction<'a>>>,
transaction: RwLock<Option<Transaction<'p>>>,
/// The current block execution context
block_execution_context: RwLock<Option<BlockExecutionContext>>,
/// The State sync session
snapshot_fetching_session: RwLock<Option<SnapshotFetchingSession<'p>>>,
/// The snapshot manager
snapshot_manager: SnapshotManager,
}

impl<'a, C> ConsensusAbciApplication<'a, C> {
impl<'p, C> ConsensusAbciApplication<'p, C> {
/// Create new ABCI app
pub fn new(platform: &'a Platform<C>) -> Self {
pub fn new(platform: &'p Platform<C>) -> Self {
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path
.to_str()
.unwrap()
Comment on lines +46 to +47
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential None from to_str() instead of using unwrap()

Using unwrap() on to_str() may cause a panic if the path contains invalid UTF-8 characters. Consider handling the Option to prevent a possible panic.

Apply this diff to handle the potential None value:

-let snapshot_manager = SnapshotManager::new(
-    platform
-        .config
-        .state_sync_config
-        .checkpoints_path
-        .to_str()
-        .unwrap()
-        .to_string(),
+let checkpoints_path_str = platform
+    .config
+    .state_sync_config
+    .checkpoints_path
+    .to_str()
+    .ok_or_else(|| {
+        Error::InitializationError("Invalid checkpoints path: non-UTF8 characters present".to_string())
+    })?
+    .to_string();
+
+let snapshot_manager = SnapshotManager::new(
+    checkpoints_path_str,
     platform.config.state_sync_config.max_num_snapshots,
     platform.config.state_sync_config.snapshots_frequency,
 );

Committable suggestion skipped: line range outside the PR's diff.

.to_string(),
platform.config.state_sync_config.max_num_snapshots,
platform.config.state_sync_config.snapshots_frequency,
);
Self {
platform,
transaction: Default::default(),
block_execution_context: Default::default(),
snapshot_fetching_session: Default::default(),
snapshot_manager,
}
}
}

impl<'a, C> PlatformApplication<C> for ConsensusAbciApplication<'a, C> {
impl<'p, C> PlatformApplication<C> for ConsensusAbciApplication<'p, C> {

Check warning on line 62 in packages/rs-drive-abci/src/abci/app/consensus.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

the following explicit lifetimes could be elided: 'p

warning: the following explicit lifetimes could be elided: 'p --> packages/rs-drive-abci/src/abci/app/consensus.rs:62:6 | 62 | impl<'p, C> PlatformApplication<C> for ConsensusAbciApplication<'p, C> { | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes = note: `#[warn(clippy::needless_lifetimes)]` on by default help: elide the lifetimes | 62 - impl<'p, C> PlatformApplication<C> for ConsensusAbciApplication<'p, C> { 62 + impl<C> PlatformApplication<C> for ConsensusAbciApplication<'_, C> { |
fn platform(&self) -> &Platform<C> {
self.platform
}
}

impl<'a, C> BlockExecutionApplication for ConsensusAbciApplication<'a, C> {
impl<'p, C> SnapshotManagerApplication for ConsensusAbciApplication<'p, C> {

Check warning on line 68 in packages/rs-drive-abci/src/abci/app/consensus.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

the following explicit lifetimes could be elided: 'p

warning: the following explicit lifetimes could be elided: 'p --> packages/rs-drive-abci/src/abci/app/consensus.rs:68:6 | 68 | impl<'p, C> SnapshotManagerApplication for ConsensusAbciApplication<'p, C> { | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes help: elide the lifetimes | 68 - impl<'p, C> SnapshotManagerApplication for ConsensusAbciApplication<'p, C> { 68 + impl<C> SnapshotManagerApplication for ConsensusAbciApplication<'_, C> { |
fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
}

impl<'p, C> SnapshotFetchingApplication<'p, C> for ConsensusAbciApplication<'p, C> {
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'p>>> {
&self.snapshot_fetching_session
}

fn platform(&self) -> &'p Platform<C> {
self.platform
}
}

impl<'p, C> BlockExecutionApplication for ConsensusAbciApplication<'p, C> {

Check warning on line 84 in packages/rs-drive-abci/src/abci/app/consensus.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

the following explicit lifetimes could be elided: 'p

warning: the following explicit lifetimes could be elided: 'p --> packages/rs-drive-abci/src/abci/app/consensus.rs:84:6 | 84 | impl<'p, C> BlockExecutionApplication for ConsensusAbciApplication<'p, C> { | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes help: elide the lifetimes | 84 - impl<'p, C> BlockExecutionApplication for ConsensusAbciApplication<'p, C> { 84 + impl<C> BlockExecutionApplication for ConsensusAbciApplication<'_, C> { |
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>> {
&self.block_execution_context
}
}

impl<'a, C> TransactionalApplication<'a> for ConsensusAbciApplication<'a, C> {
impl<'p, C> TransactionalApplication<'p> for ConsensusAbciApplication<'p, C> {
/// create and store a new transaction
fn start_transaction(&self) {
let transaction = self.platform.drive.grove.start_transaction();
self.transaction.write().unwrap().replace(transaction);
}

fn transaction(&self) -> &RwLock<Option<Transaction<'a>>> {
fn transaction(&self) -> &RwLock<Option<Transaction<'p>>> {
&self.transaction
}

Expand All @@ -77,13 +116,13 @@
}
}

impl<'a, C> Debug for ConsensusAbciApplication<'a, C> {
impl<'p, C> Debug for ConsensusAbciApplication<'p, C> {

Check warning on line 119 in packages/rs-drive-abci/src/abci/app/consensus.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

the following explicit lifetimes could be elided: 'p

warning: the following explicit lifetimes could be elided: 'p --> packages/rs-drive-abci/src/abci/app/consensus.rs:119:6 | 119 | impl<'p, C> Debug for ConsensusAbciApplication<'p, C> { | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes help: elide the lifetimes | 119 - impl<'p, C> Debug for ConsensusAbciApplication<'p, C> { 119 + impl<C> Debug for ConsensusAbciApplication<'_, C> { |
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "<ConsensusAbciApplication>")
}
}

impl<'a, C> tenderdash_abci::Application for ConsensusAbciApplication<'a, C>
impl<'p, C> tenderdash_abci::Application for ConsensusAbciApplication<'p, C>

Check warning on line 125 in packages/rs-drive-abci/src/abci/app/consensus.rs

View workflow job for this annotation

GitHub Actions / Rust packages (drive-abci) / Linting

the following explicit lifetimes could be elided: 'p

warning: the following explicit lifetimes could be elided: 'p --> packages/rs-drive-abci/src/abci/app/consensus.rs:125:6 | 125 | impl<'p, C> tenderdash_abci::Application for ConsensusAbciApplication<'p, C> | ^^ ^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#needless_lifetimes help: elide the lifetimes | 125 - impl<'p, C> tenderdash_abci::Application for ConsensusAbciApplication<'p, C> 125 + impl<C> tenderdash_abci::Application for ConsensusAbciApplication<'_, C> |
where
C: CoreRPCLike,
{
Expand Down Expand Up @@ -149,4 +188,32 @@
) -> Result<proto::ResponseVerifyVoteExtension, proto::ResponseException> {
handler::verify_vote_extension(self, request).map_err(error_into_exception)
}

fn offer_snapshot(
&self,
request: proto::RequestOfferSnapshot,
) -> Result<proto::ResponseOfferSnapshot, proto::ResponseException> {
handler::offer_snapshot(self, request).map_err(error_into_exception)
}

fn apply_snapshot_chunk(
&self,
request: proto::RequestApplySnapshotChunk,
) -> Result<proto::ResponseApplySnapshotChunk, proto::ResponseException> {
handler::apply_snapshot_chunk(self, request).map_err(error_into_exception)
}

fn list_snapshots(
&self,
request: proto::RequestListSnapshots,
) -> Result<proto::ResponseListSnapshots, proto::ResponseException> {
handler::list_snapshots(self, request).map_err(error_into_exception)
}

fn load_snapshot_chunk(
&self,
request: proto::RequestLoadSnapshotChunk,
) -> Result<proto::ResponseLoadSnapshotChunk, proto::ResponseException> {
handler::load_snapshot_chunk(self, request).map_err(error_into_exception)
}
}
67 changes: 66 additions & 1 deletion packages/rs-drive-abci/src/abci/app/full.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
use crate::abci::app::{BlockExecutionApplication, PlatformApplication, TransactionalApplication};
use crate::abci::app::{
BlockExecutionApplication, PlatformApplication, SnapshotFetchingApplication,
SnapshotManagerApplication, TransactionalApplication,
};
use crate::abci::handler;
use crate::abci::handler::error::error_into_exception;
use crate::error::execution::ExecutionError;
use crate::error::Error;
use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::platform::Platform;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::CoreRPCLike;
use dpp::version::PlatformVersion;
use drive::grovedb::Transaction;
Expand All @@ -23,15 +27,32 @@ pub struct FullAbciApplication<'a, C> {
pub transaction: RwLock<Option<Transaction<'a>>>,
/// The current block execution context
pub block_execution_context: RwLock<Option<BlockExecutionContext>>,
/// The State sync session
pub snapshot_fetching_session: RwLock<Option<SnapshotFetchingSession<'a>>>,
/// The snapshot manager
pub snapshot_manager: SnapshotManager,
}

impl<'a, C> FullAbciApplication<'a, C> {
/// Create new ABCI app
pub fn new(platform: &'a Platform<C>) -> Self {
let snapshot_manager = SnapshotManager::new(
platform
.config
.state_sync_config
.checkpoints_path
.to_str()
.unwrap()
.to_string(),
platform.config.state_sync_config.max_num_snapshots,
Comment on lines +45 to +46
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential None from to_str() instead of using unwrap()

Similar to the previous comment, using unwrap() on to_str() can lead to a panic if the path is invalid UTF-8. Handle the Option to ensure robustness.

Apply this diff to handle the potential None value:

-let snapshot_manager = SnapshotManager::new(
-    platform
-        .config
-        .state_sync_config
-        .checkpoints_path
-        .to_str()
-        .unwrap()
-        .to_string(),
+let checkpoints_path_str = platform
+    .config
+    .state_sync_config
+    .checkpoints_path
+    .to_str()
+    .ok_or_else(|| {
+        Error::InitializationError("Invalid checkpoints path: non-UTF8 characters present".to_string())
+    })?
+    .to_string();
+
+let snapshot_manager = SnapshotManager::new(
+    checkpoints_path_str,
     platform.config.state_sync_config.max_num_snapshots,
     platform.config.state_sync_config.snapshots_frequency,
 );

Committable suggestion skipped: line range outside the PR's diff.

platform.config.state_sync_config.snapshots_frequency,
);
Self {
platform,
transaction: Default::default(),
block_execution_context: Default::default(),
snapshot_fetching_session: Default::default(),
snapshot_manager,
}
}
}
Expand All @@ -42,6 +63,22 @@ impl<'a, C> PlatformApplication<C> for FullAbciApplication<'a, C> {
}
}

impl<'a, C> SnapshotManagerApplication for FullAbciApplication<'a, C> {
fn snapshot_manager(&self) -> &SnapshotManager {
&self.snapshot_manager
}
}

impl<'a, C> SnapshotFetchingApplication<'a, C> for FullAbciApplication<'a, C> {
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'a>>> {
&self.snapshot_fetching_session
}

fn platform(&self) -> &'a Platform<C> {
self.platform
}
}

impl<'a, C> BlockExecutionApplication for FullAbciApplication<'a, C> {
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>> {
&self.block_execution_context
Expand Down Expand Up @@ -150,4 +187,32 @@ where
) -> Result<proto::ResponseVerifyVoteExtension, proto::ResponseException> {
handler::verify_vote_extension(self, request).map_err(error_into_exception)
}

fn offer_snapshot(
&self,
request: proto::RequestOfferSnapshot,
) -> Result<proto::ResponseOfferSnapshot, proto::ResponseException> {
handler::offer_snapshot(self, request).map_err(error_into_exception)
}

fn apply_snapshot_chunk(
&self,
request: proto::RequestApplySnapshotChunk,
) -> Result<proto::ResponseApplySnapshotChunk, proto::ResponseException> {
handler::apply_snapshot_chunk(self, request).map_err(error_into_exception)
}

fn list_snapshots(
&self,
request: proto::RequestListSnapshots,
) -> Result<proto::ResponseListSnapshots, proto::ResponseException> {
handler::list_snapshots(self, request).map_err(error_into_exception)
}

fn load_snapshot_chunk(
&self,
request: proto::RequestLoadSnapshotChunk,
) -> Result<proto::ResponseLoadSnapshotChunk, proto::ResponseException> {
handler::load_snapshot_chunk(self, request).map_err(error_into_exception)
}
}
22 changes: 20 additions & 2 deletions packages/rs-drive-abci/src/abci/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,36 @@ mod consensus;
/// Convert state transition execution result into ABCI response
pub mod execution_result;
mod full;
mod state_source;

use crate::execution::types::block_execution_context::BlockExecutionContext;
use crate::platform_types::snapshot::{SnapshotFetchingSession, SnapshotManager};
use crate::rpc::core::DefaultCoreRPC;
pub use check_tx::CheckTxAbciApplication;
pub use consensus::ConsensusAbciApplication;
use dpp::version::PlatformVersion;
pub use full::FullAbciApplication;
pub use state_source::StateSourceAbciApplication;

/// Platform-based ABCI application
pub trait PlatformApplication<C = DefaultCoreRPC> {
/// Returns Platform
fn platform(&self) -> &Platform<C>;
}

/// Platform-based ABCI application
pub trait SnapshotManagerApplication {
/// Returns Platform
fn snapshot_manager(&self) -> &SnapshotManager;
}

/// Transactional ABCI application
pub trait TransactionalApplication<'a> {
pub trait TransactionalApplication<'p> {
/// Creates and keeps a new transaction
fn start_transaction(&self);

/// Returns the current transaction
fn transaction(&self) -> &RwLock<Option<Transaction<'a>>>;
fn transaction(&self) -> &RwLock<Option<Transaction<'p>>>;

/// Commits created transaction
fn commit_transaction(&self, platform_version: &PlatformVersion) -> Result<(), Error>;
Expand All @@ -39,3 +48,12 @@ pub trait BlockExecutionApplication {
/// Returns the current block execution context
fn block_execution_context(&self) -> &RwLock<Option<BlockExecutionContext>>;
}

/// Application that can maintain state sync
pub trait SnapshotFetchingApplication<'p, C> {
/// Returns the current snapshot fetching session
fn snapshot_fetching_session(&self) -> &RwLock<Option<SnapshotFetchingSession<'p>>>;

/// Returns platform reference
fn platform(&self) -> &'p Platform<C>;
}
Loading
Loading