diff --git a/code/crates/actors/src/consensus.rs b/code/crates/actors/src/consensus.rs index b533dd62e..bf4eec102 100644 --- a/code/crates/actors/src/consensus.rs +++ b/code/crates/actors/src/consensus.rs @@ -12,6 +12,7 @@ use tracing::{debug, error, info, warn}; use malachite_blocksync as blocksync; use malachite_common::{ CommitCertificate, Context, Round, SignedExtension, Timeout, TimeoutStep, ValidatorSet, + ValueOrigin, }; use malachite_config::TimeoutConfig; use malachite_consensus::{Effect, Resume, ValueToPropose}; @@ -62,7 +63,7 @@ pub enum Msg { ProposeValue(Ctx::Height, Round, Ctx::Value, Option>), /// Received and assembled the full value proposed by a validator - ReceivedProposedValue(ProposedValue), + ReceivedProposedValue(ProposedValue, ValueOrigin), /// Get the status of the consensus state machine GetStatus(RpcReplyPort>), @@ -311,7 +312,9 @@ where reply_to, }, &myself, - |proposed| Msg::::ReceivedProposedValue(proposed), + |proposed| { + Msg::::ReceivedProposedValue(proposed, ValueOrigin::BlockSync) + }, None, )?; @@ -379,7 +382,7 @@ where reply_to, }, &myself, - |value| Msg::ReceivedProposedValue(value), + |value| Msg::ReceivedProposedValue(value, ValueOrigin::Consensus), None, ) .map_err(|e| { @@ -418,9 +421,9 @@ where Ok(()) } - Msg::ReceivedProposedValue(value) => { + Msg::ReceivedProposedValue(value, origin) => { let result = self - .process_input(&myself, state, ConsensusInput::ProposedValue(value)) + .process_input(&myself, state, ConsensusInput::ProposedValue(value, origin)) .await; if let Err(e) = result { diff --git a/code/crates/common/src/lib.rs b/code/crates/common/src/lib.rs index 6b94909a8..573fcd96c 100644 --- a/code/crates/common/src/lib.rs +++ b/code/crates/common/src/lib.rs @@ -65,5 +65,5 @@ pub use signing::SigningScheme; pub use threshold::{Threshold, ThresholdParam, ThresholdParams}; pub use timeout::{Timeout, TimeoutStep}; pub use validator_set::{Address, Validator, ValidatorSet, VotingPower}; -pub use value::{NilOrVal, Value}; +pub use value::{NilOrVal, Value, ValueOrigin}; pub use vote::{Extension, Vote, VoteType}; diff --git a/code/crates/common/src/value.rs b/code/crates/common/src/value.rs index a89eb50ce..bbb90e657 100644 --- a/code/crates/common/src/value.rs +++ b/code/crates/common/src/value.rs @@ -66,3 +66,12 @@ where /// The ID of the value. fn id(&self) -> Self::Id; } + +/// Protocols that diseminate `Value` +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum ValueOrigin { + /// Block Synchronization protocol + BlockSync, + /// Consensus protocol + Consensus, +} diff --git a/code/crates/consensus/src/handle.rs b/code/crates/consensus/src/handle.rs index e54162596..bf436fba7 100644 --- a/code/crates/consensus/src/handle.rs +++ b/code/crates/consensus/src/handle.rs @@ -50,7 +50,9 @@ where Input::Proposal(proposal) => on_proposal(co, state, metrics, proposal).await, Input::Propose(value) => on_propose(co, state, metrics, value).await, Input::TimeoutElapsed(timeout) => on_timeout_elapsed(co, state, metrics, timeout).await, - Input::ProposedValue(value) => on_proposed_value(co, state, metrics, value).await, + Input::ProposedValue(value, origin) => { + on_proposed_value(co, state, metrics, value, origin).await + } Input::CommitCertificate(certificate) => { on_commit_certificate(co, state, metrics, certificate).await } diff --git a/code/crates/consensus/src/handle/proposal.rs b/code/crates/consensus/src/handle/proposal.rs index 1559fa792..1d9f20dbe 100644 --- a/code/crates/consensus/src/handle/proposal.rs +++ b/code/crates/consensus/src/handle/proposal.rs @@ -151,7 +151,6 @@ where "Received proposal from a non-proposer" ); - // TODO - why when we replay proposals the proposer is wrong return Ok(false); }; diff --git a/code/crates/consensus/src/handle/proposed_value.rs b/code/crates/consensus/src/handle/proposed_value.rs index da98de078..7602d825a 100644 --- a/code/crates/consensus/src/handle/proposed_value.rs +++ b/code/crates/consensus/src/handle/proposed_value.rs @@ -17,6 +17,7 @@ pub async fn on_proposed_value( state: &mut State, metrics: &Metrics, proposed_value: ProposedValue, + origin: ValueOrigin, ) -> Result<(), Error> where Ctx: Context, @@ -27,15 +28,21 @@ where } if state.driver.height() < proposed_value.height { - debug!("Received value for next height, queuing for later"); - state.buffer_input(proposed_value.height, Input::ProposedValue(proposed_value)); + debug!("Received value for higher height, queuing for later"); + + state.buffer_input( + proposed_value.height, + Input::ProposedValue(proposed_value, origin), + ); return Ok(()); } state.store_value(&proposed_value); - - if state.params.value_payload.parts_only() { + // There are two cases where we need to generate an internal Proposal message for consensus to process the full proposal: + // a) In parts-only mode, where we do not get a Proposal message but only the proposal parts + // b) In any mode if the proposed value was provided by BlockSync, where we do net get a Proposal message but only the full value and the certificate + if state.params.value_payload.parts_only() || origin == ValueOrigin::BlockSync { let proposal = Ctx::new_proposal( proposed_value.height, proposed_value.round, diff --git a/code/crates/consensus/src/input.rs b/code/crates/consensus/src/input.rs index 2d1b04f94..985bd481c 100644 --- a/code/crates/consensus/src/input.rs +++ b/code/crates/consensus/src/input.rs @@ -1,6 +1,8 @@ use derive_where::derive_where; -use malachite_common::{CommitCertificate, Context, SignedProposal, SignedVote, Timeout}; +use malachite_common::{ + CommitCertificate, Context, SignedProposal, SignedVote, Timeout, ValueOrigin, +}; use crate::types::ProposedValue; use crate::ValueToPropose; @@ -26,8 +28,9 @@ where /// A timeout has elapsed TimeoutElapsed(Timeout), - /// Received the full proposed value corresponding to a proposal - ProposedValue(ProposedValue), + /// Received the full proposed value corresponding to a proposal. + /// The origin denotes whether the value was received via consensus or BlockSync. + ProposedValue(ProposedValue, ValueOrigin), /// Received a commit certificate from BlockSync CommitCertificate(CommitCertificate), diff --git a/code/crates/consensus/tests/full_proposal.rs b/code/crates/consensus/tests/full_proposal.rs index 029e874ee..a3a3029ee 100644 --- a/code/crates/consensus/tests/full_proposal.rs +++ b/code/crates/consensus/tests/full_proposal.rs @@ -1,4 +1,4 @@ -use malachite_common::{Context, Round, SignedProposal, Validity}; +use malachite_common::{Context, Round, SignedProposal, Validity, ValueOrigin}; use malachite_consensus::{FullProposal, FullProposalKeeper, Input, ProposedValue}; use malachite_test::utils::validators::make_validators; use malachite_test::{Address, Proposal, Value}; @@ -66,15 +66,18 @@ fn val_msg( value: u64, validity: Validity, ) -> Input { - Input::ProposedValue(ProposedValue { - height: Height::new(1), - round: Round::new(round), - valid_round: Round::Nil, - value: Value::new(value), - validity, - validator_address, - extension: Default::default(), - }) + Input::ProposedValue( + ProposedValue { + height: Height::new(1), + round: Round::new(round), + valid_round: Round::Nil, + value: Value::new(value), + validity, + validator_address, + extension: Default::default(), + }, + ValueOrigin::Consensus, + ) } fn prop_at_round_and_value( @@ -275,7 +278,7 @@ fn full_proposal_keeper_tests() { for m in s.input { match m { Input::Proposal(p) => keeper.store_proposal(p), - Input::ProposedValue(v) => keeper.store_value(&v), + Input::ProposedValue(v, _) => keeper.store_value(&v), _ => continue, } } diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index ad72f6ce6..f709bbc59 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -68,6 +68,30 @@ impl HostState { stream_id } + #[tracing::instrument(skip_all, fields(%height, %round))] + pub async fn build_block_from_parts( + &self, + parts: &[Arc], + height: Height, + round: Round, + ) -> Option<(ProposedValue, Block)> { + let value = self.build_value_from_parts(parts, height, round).await?; + + let txes = parts + .iter() + .filter_map(|part| part.as_transactions()) + .flat_map(|txes| txes.to_vec()) + .collect::>(); + + let block = Block { + height, + transactions: Transactions::new(txes), + block_hash: value.value, + }; + + Some((value, block)) + } + #[tracing::instrument(skip_all, fields(%height, %round))] pub async fn build_value_from_parts( &self, @@ -293,7 +317,7 @@ impl StarknetHost { match state.block_store.prune(retain_height).await { Ok(pruned) => { debug!( - %retain_height, pruned = pruned.iter().join(", "), + %retain_height, pruned_heights = pruned.iter().join(", "), "Pruned the block store" ); } @@ -371,6 +395,7 @@ impl Actor for StarknetHost { while let Some(part) = rx_part.recv().await { state.host.part_store.store(height, round, part.clone()); + if state.host.params.value_payload.include_parts() { debug!(%stream_id, %sequence, "Broadcasting proposal part"); @@ -404,17 +429,28 @@ impl Actor for StarknetHost { let parts = state.host.part_store.all_parts(height, round); - let extension = state.host.generate_vote_extension(height, round); + let Some((value, block)) = + state.build_block_from_parts(&parts, height, round).await + else { + error!(%height, %round, "Failed to build block from parts"); + return Ok(()); + }; - if let Some(value) = state.build_value_from_parts(&parts, height, round).await { - reply_to.send(LocallyProposedValue::new( - value.height, - value.round, - value.value, - extension, - ))?; + if let Err(e) = state + .block_store + .store_undecided_block(value.height, value.round, block) + .await + { + error!(%e, %height, %round, "Failed to store the proposed block"); } + reply_to.send(LocallyProposedValue::new( + value.height, + value.round, + value.value, + value.extension, + ))?; + Ok(()) } @@ -545,7 +581,11 @@ impl Actor for StarknetHost { } // Build the block from transaction parts and certificate, and store it - if let Err(e) = state.block_store.store(&certificate, &all_txes).await { + if let Err(e) = state + .block_store + .store_decided_block(&certificate, &all_txes) + .await + { error!(%e, %height, %round, "Failed to store the block"); } diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index 73881a1d7..f1e79e134 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -2,17 +2,20 @@ use std::ops::RangeBounds; use std::path::Path; use std::sync::Arc; -use malachite_common::CommitCertificate; -use malachite_proto::Protobuf; - use prost::Message; use redb::ReadableTable; use thiserror::Error; +use malachite_common::{CommitCertificate, Round}; +use malachite_proto::Protobuf; + use crate::codec; use crate::proto::{self as proto, Error as ProtoError}; use crate::types::MockContext; -use crate::types::{Block, Height, Transaction, Transactions}; +use crate::types::{Block, BlockHash, Height, Transaction, Transactions}; + +mod keys; +use keys::{HeightKey, UndecidedBlockKey}; #[derive(Clone, Debug)] pub struct DecidedBlock { @@ -21,7 +24,7 @@ pub struct DecidedBlock { } fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { - let proto = proto::CommitCertificate::decode(bytes)?; + let proto = proto::sync::CommitCertificate::decode(bytes)?; codec::decode_certificate(proto) } @@ -54,52 +57,14 @@ pub enum StoreError { TaskJoin(#[from] tokio::task::JoinError), } -#[derive(Copy, Clone, Debug)] -struct HeightKey; - -impl redb::Value for HeightKey { - type SelfType<'a> = Height; - - type AsBytes<'a> = Vec; - - fn fixed_width() -> Option { - Some(core::mem::size_of::() * 2) - } - - fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> - where - Self: 'a, - { - let (fork_id, block_number) = <(u64, u64) as redb::Value>::from_bytes(data); - - Height { - fork_id, - block_number, - } - } - - fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> - where - Self: 'a, - Self: 'b, - { - <(u64, u64) as redb::Value>::as_bytes(&(value.fork_id, value.block_number)) - } - - fn type_name() -> redb::TypeName { - redb::TypeName::new("starknet::Height") - } -} +const CERTIFICATES_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("certificates"); -impl redb::Key for HeightKey { - fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { - <(u64, u64) as redb::Key>::compare(data1, data2) - } -} +const DECIDED_BLOCKS_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("decided_blocks"); -const BLOCK_TABLE: redb::TableDefinition> = redb::TableDefinition::new("blocks"); -const CERTIFICATE_TABLE: redb::TableDefinition> = - redb::TableDefinition::new("certificates"); +const UNDECIDED_BLOCKS_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("undecided_blocks"); struct Db { db: redb::Database, @@ -115,12 +80,12 @@ impl Db { fn get(&self, height: Height) -> Result, StoreError> { let tx = self.db.begin_read()?; let block = { - let table = tx.open_table(BLOCK_TABLE)?; + let table = tx.open_table(DECIDED_BLOCKS_TABLE)?; let value = table.get(&height)?; value.and_then(|value| Block::from_bytes(&value.value()).ok()) }; let certificate = { - let table = tx.open_table(CERTIFICATE_TABLE)?; + let table = tx.open_table(CERTIFICATES_TABLE)?; let value = table.get(&height)?; value.and_then(|value| decode_certificate(&value.value()).ok()) }; @@ -132,16 +97,16 @@ impl Db { Ok(decided_block) } - fn insert(&self, decided_block: DecidedBlock) -> Result<(), StoreError> { + fn insert_decided_block(&self, decided_block: DecidedBlock) -> Result<(), StoreError> { let height = decided_block.block.height; let tx = self.db.begin_write()?; { - let mut blocks = tx.open_table(BLOCK_TABLE)?; + let mut blocks = tx.open_table(DECIDED_BLOCKS_TABLE)?; blocks.insert(height, decided_block.block.to_bytes()?.to_vec())?; } { - let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; certificates.insert(height, encode_certificate(decided_block.certificate)?)?; } tx.commit()?; @@ -149,7 +114,24 @@ impl Db { Ok(()) } - fn range( + fn insert_undecided_block( + &self, + height: Height, + round: Round, + block: Block, + ) -> Result<(), StoreError> { + let key = (height, round, block.block_hash); + let value = codec::encode_block(&block)?; + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(UNDECIDED_BLOCKS_TABLE)?; + table.insert(key, value)?; + } + tx.commit()?; + Ok(()) + } + + fn height_range
( &self, table: &Table, range: impl RangeBounds, @@ -164,14 +146,39 @@ impl Db { .collect::>()) } + fn undecided_block_range
( + &self, + table: &Table, + range: impl RangeBounds<(Height, Round, BlockHash)>, + ) -> Result, StoreError> + where + Table: redb::ReadableTable>, + { + Ok(table + .range(range)? + .flatten() + .map(|(key, _)| key.value()) + .collect::>()) + } + fn prune(&self, retain_height: Height) -> Result, StoreError> { let tx = self.db.begin_write().unwrap(); let pruned = { - let mut blocks = tx.open_table(BLOCK_TABLE)?; - let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; - let keys = self.range(&blocks, ..retain_height)?; + let mut undecided = tx.open_table(UNDECIDED_BLOCKS_TABLE)?; + let keys = self.undecided_block_range( + &undecided, + ..(retain_height, Round::Nil, BlockHash::new([0; 32])), + )?; + for key in keys { + undecided.remove(key)?; + } + + let mut decided = tx.open_table(DECIDED_BLOCKS_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; + + let keys = self.height_range(&decided, ..retain_height)?; for key in &keys { - blocks.remove(key)?; + decided.remove(key)?; certificates.remove(key)?; } keys @@ -183,14 +190,14 @@ impl Db { fn first_key(&self) -> Option { let tx = self.db.begin_read().unwrap(); - let table = tx.open_table(BLOCK_TABLE).unwrap(); + let table = tx.open_table(DECIDED_BLOCKS_TABLE).unwrap(); let (key, _) = table.first().ok()??; Some(key.value()) } fn last_key(&self) -> Option { let tx = self.db.begin_read().unwrap(); - let table = tx.open_table(BLOCK_TABLE).unwrap(); + let table = tx.open_table(DECIDED_BLOCKS_TABLE).unwrap(); let (key, _) = table.last().ok()??; Some(key.value()) } @@ -198,8 +205,9 @@ impl Db { fn create_tables(&self) -> Result<(), StoreError> { let tx = self.db.begin_write()?; // Implicitly creates the tables if they do not exist yet - let _ = tx.open_table(BLOCK_TABLE)?; - let _ = tx.open_table(CERTIFICATE_TABLE)?; + let _ = tx.open_table(DECIDED_BLOCKS_TABLE)?; + let _ = tx.open_table(CERTIFICATES_TABLE)?; + let _ = tx.open_table(UNDECIDED_BLOCKS_TABLE)?; tx.commit()?; Ok(()) } @@ -231,7 +239,7 @@ impl BlockStore { tokio::task::spawn_blocking(move || db.get(height)).await? } - pub async fn store( + pub async fn store_decided_block( &self, certificate: &CommitCertificate, txes: &[Transaction], @@ -246,7 +254,17 @@ impl BlockStore { }; let db = Arc::clone(&self.db); - tokio::task::spawn_blocking(move || db.insert(decided_block)).await? + tokio::task::spawn_blocking(move || db.insert_decided_block(decided_block)).await? + } + + pub async fn store_undecided_block( + &self, + height: Height, + round: Round, + block: Block, + ) -> Result<(), StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.insert_undecided_block(height, round, block)).await? } pub async fn prune(&self, retain_height: Height) -> Result, StoreError> { diff --git a/code/crates/starknet/host/src/block_store/keys.rs b/code/crates/starknet/host/src/block_store/keys.rs new file mode 100644 index 000000000..22dbf1168 --- /dev/null +++ b/code/crates/starknet/host/src/block_store/keys.rs @@ -0,0 +1,124 @@ +use core::mem::size_of; + +use malachite_common::Round; +use malachite_starknet_p2p_types::{BlockHash, Height}; + +pub type UndecidedBlockKey = (HeightKey, RoundKey, BlockHashKey); + +#[derive(Copy, Clone, Debug)] +pub struct HeightKey; + +impl redb::Value for HeightKey { + type SelfType<'a> = Height; + type AsBytes<'a> = Vec; + + fn fixed_width() -> Option { + Some(size_of::() * 2) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let (fork_id, block_number) = <(u64, u64) as redb::Value>::from_bytes(data); + + Height { + fork_id, + block_number, + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + <(u64, u64) as redb::Value>::as_bytes(&(value.fork_id, value.block_number)) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("starknet::Height") + } +} + +impl redb::Key for HeightKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + <(u64, u64) as redb::Key>::compare(data1, data2) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct RoundKey; + +impl redb::Value for RoundKey { + type SelfType<'a> = Round; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let round = ::from_bytes(data); + Round::from(round) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + ::as_bytes(&value.as_i64()) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Round") + } +} + +impl redb::Key for RoundKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + ::compare(data1, data2) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct BlockHashKey; + +impl redb::Value for BlockHashKey { + type SelfType<'a> = BlockHash; + type AsBytes<'a> = &'a [u8; 32]; + + fn fixed_width() -> Option { + Some(32) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let bytes = <[u8; 32] as redb::Value>::from_bytes(data); + BlockHash::new(bytes) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.as_bytes() + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Round") + } +} + +impl redb::Key for BlockHashKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + <[u8; 32] as redb::Key>::compare(data1, data2) + } +} diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index f6add0572..95c210187 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -1,3 +1,4 @@ +use malachite_starknet_p2p_types::Block; use prost::Message; use malachite_actors::util::codec::NetworkCodec; @@ -11,7 +12,7 @@ use malachite_consensus::SignedConsensusMsg; use malachite_gossip_consensus::Bytes; use crate::proto::consensus_message::Messages; -use crate::proto::{self as proto, ConsensusMessage, Error as ProtoError, Protobuf}; +use crate::proto::{self as proto, Error as ProtoError, Protobuf}; use crate::types::MockContext; use crate::types::{self as p2p, Address, BlockHash, Height, ProposalPart, Vote}; @@ -33,12 +34,11 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let status = - proto::blocksync::Status::decode(bytes.as_ref()).map_err(ProtoError::Decode)?; + let status = proto::sync::Status::decode(bytes.as_ref()).map_err(ProtoError::Decode)?; let peer_id = status .peer_id - .ok_or_else(|| ProtoError::missing_field::("peer_id"))?; + .ok_or_else(|| ProtoError::missing_field::("peer_id"))?; Ok(blocksync::Status { peer_id: libp2p_identity::PeerId::from_bytes(&peer_id.id) @@ -52,7 +52,7 @@ impl NetworkCodec> for ProtobufCodec { } fn encode(&self, status: blocksync::Status) -> Result { - let proto = proto::blocksync::Status { + let proto = proto::sync::Status { peer_id: Some(proto::PeerId { id: Bytes::from(status.peer_id.to_bytes()), }), @@ -70,7 +70,7 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let request = proto::blocksync::Request::decode(bytes).map_err(ProtoError::Decode)?; + let request = proto::sync::Request::decode(bytes).map_err(ProtoError::Decode)?; Ok(blocksync::Request { height: Height::new(request.block_number, request.fork_id), @@ -78,7 +78,7 @@ impl NetworkCodec> for ProtobufCodec { } fn encode(&self, request: blocksync::Request) -> Result { - let proto = proto::blocksync::Request { + let proto = proto::sync::Request { block_number: request.height.block_number, fork_id: request.height.fork_id, }; @@ -91,7 +91,7 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let response = proto::blocksync::Response::decode(bytes).map_err(ProtoError::Decode)?; + let response = proto::sync::Response::decode(bytes).map_err(ProtoError::Decode)?; Ok(blocksync::Response { height: Height::new(response.block_number, response.fork_id), @@ -100,7 +100,7 @@ impl NetworkCodec> for ProtobufCodec { } fn encode(&self, response: blocksync::Response) -> Result { - let proto = proto::blocksync::Response { + let proto = proto::sync::Response { block_number: response.height.block_number, fork_id: response.height.fork_id, block: response.block.map(encode_synced_block).transpose()?, @@ -114,15 +114,15 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let proto = ConsensusMessage::decode(bytes)?; + let proto = proto::ConsensusMessage::decode(bytes)?; let proto_signature = proto .signature - .ok_or_else(|| ProtoError::missing_field::("signature"))?; + .ok_or_else(|| ProtoError::missing_field::("signature"))?; let message = proto .messages - .ok_or_else(|| ProtoError::missing_field::("messages"))?; + .ok_or_else(|| ProtoError::missing_field::("messages"))?; let signature = p2p::Signature::from_proto(proto_signature)?; @@ -137,11 +137,11 @@ impl NetworkCodec> for ProtobufCodec { fn encode(&self, msg: SignedConsensusMsg) -> Result { let message = match msg { - SignedConsensusMsg::Vote(v) => ConsensusMessage { + SignedConsensusMsg::Vote(v) => proto::ConsensusMessage { messages: Some(Messages::Vote(v.to_proto()?)), signature: Some(v.signature.to_proto()?), }, - SignedConsensusMsg::Proposal(p) => ConsensusMessage { + SignedConsensusMsg::Proposal(p) => proto::ConsensusMessage { messages: Some(Messages::Proposal(p.to_proto()?)), signature: Some(p.signature.to_proto()?), }, @@ -185,9 +185,33 @@ where } } +pub(crate) fn encode_synced_block( + synced_block: blocksync::SyncedBlock, +) -> Result { + Ok(proto::sync::SyncedBlock { + block_bytes: synced_block.block_bytes, + certificate: Some(encode_certificate(synced_block.certificate)?), + }) +} + +pub(crate) fn decode_synced_block( + proto: proto::sync::SyncedBlock, +) -> Result, ProtoError> { + let Some(certificate) = proto.certificate else { + return Err(ProtoError::missing_field::( + "certificate", + )); + }; + + Ok(blocksync::SyncedBlock { + block_bytes: proto.block_bytes, + certificate: decode_certificate(certificate)?, + }) +} + pub(crate) fn encode_aggregate_signature( aggregated_signature: AggregatedSignature, -) -> Result { +) -> Result { let signatures = aggregated_signature .signatures .into_iter() @@ -195,50 +219,19 @@ pub(crate) fn encode_aggregate_signature( let validator_address = s.address.to_proto()?; let signature = s.signature.to_proto()?; - Ok(proto::CommitSignature { + Ok(proto::sync::CommitSignature { validator_address: Some(validator_address), signature: Some(signature), - extension: s - .extension - .map(|e| -> Result<_, ProtoError> { - Ok(proto::Extension { - data: e.message.data, - signature: Some(e.signature.to_proto()?), - }) - }) - .transpose()?, + extension: s.extension.map(encode_extension).transpose()?, }) }) .collect::>()?; - Ok(proto::AggregatedSignature { signatures }) -} - -pub(crate) fn encode_certificate( - certificate: CommitCertificate, -) -> Result { - Ok(proto::CommitCertificate { - fork_id: certificate.height.fork_id, - block_number: certificate.height.block_number, - round: certificate.round.as_u32().expect("round should not be nil"), - block_hash: Some(certificate.value_id.to_proto()?), - aggregated_signature: Some(encode_aggregate_signature( - certificate.aggregated_signature, - )?), - }) -} - -pub(crate) fn encode_synced_block( - synced_block: blocksync::SyncedBlock, -) -> Result { - Ok(proto::blocksync::SyncedBlock { - block_bytes: synced_block.block_bytes, - certificate: Some(encode_certificate(synced_block.certificate)?), - }) + Ok(proto::sync::AggregatedSignature { signatures }) } pub(crate) fn decode_aggregated_signature( - signature: proto::AggregatedSignature, + signature: proto::sync::AggregatedSignature, ) -> Result, ProtoError> { let signatures = signature .signatures @@ -246,28 +239,19 @@ pub(crate) fn decode_aggregated_signature( .map(|s| { let signature = s .signature - .ok_or_else(|| ProtoError::missing_field::("signature")) + .ok_or_else(|| { + ProtoError::missing_field::("signature") + }) .and_then(p2p::Signature::from_proto)?; let address = s .validator_address .ok_or_else(|| { - ProtoError::missing_field::("validator_address") + ProtoError::missing_field::("validator_address") }) .and_then(Address::from_proto)?; - let extension = s - .extension - .map(|e| -> Result<_, ProtoError> { - let extension = Extension::from(e.data); - let signature = e - .signature - .ok_or_else(|| ProtoError::missing_field::("signature")) - .and_then(p2p::Signature::from_proto)?; - - Ok(SignedExtension::new(extension, signature)) - }) - .transpose()?; + let extension = s.extension.map(decode_extension).transpose()?; Ok(CommitSignature { address, @@ -280,13 +264,48 @@ pub(crate) fn decode_aggregated_signature( Ok(AggregatedSignature { signatures }) } +pub(crate) fn encode_extension( + ext: SignedExtension, +) -> Result { + Ok(proto::Extension { + data: ext.message.data, + signature: Some(ext.signature.to_proto()?), + }) +} + +pub(crate) fn decode_extension( + ext: proto::Extension, +) -> Result, ProtoError> { + let extension = Extension::from(ext.data); + let signature = ext + .signature + .ok_or_else(|| ProtoError::missing_field::("signature")) + .and_then(p2p::Signature::from_proto)?; + + Ok(SignedExtension::new(extension, signature)) +} + +pub(crate) fn encode_certificate( + certificate: CommitCertificate, +) -> Result { + Ok(proto::sync::CommitCertificate { + fork_id: certificate.height.fork_id, + block_number: certificate.height.block_number, + round: certificate.round.as_u32().expect("round should not be nil"), + block_hash: Some(certificate.value_id.to_proto()?), + aggregated_signature: Some(encode_aggregate_signature( + certificate.aggregated_signature, + )?), + }) +} + pub(crate) fn decode_certificate( - certificate: proto::CommitCertificate, + certificate: proto::sync::CommitCertificate, ) -> Result, ProtoError> { let value_id = if let Some(block_hash) = certificate.block_hash { BlockHash::from_proto(block_hash)? } else { - return Err(ProtoError::missing_field::( + return Err(ProtoError::missing_field::( "block_hash", )); }; @@ -294,7 +313,7 @@ pub(crate) fn decode_certificate( let aggregated_signature = if let Some(agg_sig) = certificate.aggregated_signature { decode_aggregated_signature(agg_sig)? } else { - return Err(ProtoError::missing_field::( + return Err(ProtoError::missing_field::( "aggregated_signature", )); }; @@ -309,19 +328,13 @@ pub(crate) fn decode_certificate( Ok(certificate) } -pub(crate) fn decode_synced_block( - synced_block: proto::blocksync::SyncedBlock, -) -> Result, ProtoError> { - let certificate = if let Some(certificate) = synced_block.certificate { - certificate - } else { - return Err(ProtoError::missing_field::( - "certificate", - )); +pub(crate) fn encode_block(block: &Block) -> Result, ProtoError> { + let proto = proto::sync::Block { + fork_id: block.height.fork_id, + block_number: block.height.block_number, + transactions: Some(block.transactions.to_proto()?), + block_hash: Some(block.block_hash.to_proto()?), }; - Ok(blocksync::SyncedBlock { - block_bytes: synced_block.block_bytes, - certificate: decode_certificate(certificate)?, - }) + Ok(proto.encode_to_vec()) } diff --git a/code/crates/starknet/p2p-proto/build.rs b/code/crates/starknet/p2p-proto/build.rs index 4a5fac9a4..3d026ee65 100644 --- a/code/crates/starknet/p2p-proto/build.rs +++ b/code/crates/starknet/p2p-proto/build.rs @@ -1,6 +1,6 @@ fn main() -> Result<(), Box> { let protos = &[ - "./proto/blocksync.proto", + "./proto/sync.proto", "./proto/p2p/proto/common.proto", "./proto/p2p/proto/header.proto", "./proto/p2p/proto/transaction.proto", diff --git a/code/crates/starknet/p2p-proto/proto/blocksync.proto b/code/crates/starknet/p2p-proto/proto/blocksync.proto deleted file mode 100644 index d851d5d52..000000000 --- a/code/crates/starknet/p2p-proto/proto/blocksync.proto +++ /dev/null @@ -1,38 +0,0 @@ -syntax = "proto3"; - -package blocksync; - -import "p2p/proto/common.proto"; -import "p2p/proto/consensus.proto"; -import "p2p/proto/transaction.proto"; - -message Status { - PeerID peer_id = 1; - uint64 block_number = 2; - uint64 fork_id = 3; - uint64 earliest_block_number = 4; - uint64 earliest_fork_id = 5; -} - -message Request { - uint64 block_number = 1; - uint64 fork_id = 2; -} - -message Response { - uint64 block_number = 1; - uint64 fork_id = 2; - SyncedBlock block = 3; -} - -message SyncedBlock { - bytes block_bytes = 1; - CommitCertificate certificate = 2; -} - -message Block { - uint64 fork_id = 1; - uint64 block_number = 2; - Transactions transactions = 3; - Hash block_hash = 4; -} diff --git a/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto b/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto index 89359584d..9715307ca 100644 --- a/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto +++ b/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto @@ -94,24 +94,3 @@ message Extension { ConsensusSignature signature = 2; } -// ADDED -message CommitSignature { - // TODO - add flag (no vote, nil, value?) - Address validator_address = 1; - ConsensusSignature signature = 2; - optional Extension extension = 3; -} - -// ADDED -message AggregatedSignature { - repeated CommitSignature signatures = 1; -} - -// ADDED -message CommitCertificate { - uint64 fork_id = 1; - uint64 block_number = 2; - uint32 round = 3; - Hash block_hash = 4; - AggregatedSignature aggregated_signature = 5; -} diff --git a/code/crates/starknet/p2p-proto/proto/sync.proto b/code/crates/starknet/p2p-proto/proto/sync.proto new file mode 100644 index 000000000..316e4b1fa --- /dev/null +++ b/code/crates/starknet/p2p-proto/proto/sync.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; + +package sync; + +import "p2p/proto/common.proto"; +import "p2p/proto/consensus.proto"; +import "p2p/proto/transaction.proto"; + +message Status { + PeerID peer_id = 1; + uint64 block_number = 2; + uint64 fork_id = 3; + uint64 earliest_block_number = 4; + uint64 earliest_fork_id = 5; +} + +message Request { + uint64 block_number = 1; + uint64 fork_id = 2; +} + +message Response { + uint64 block_number = 1; + uint64 fork_id = 2; + SyncedBlock block = 3; +} + +message SyncedBlock { + bytes block_bytes = 1; + CommitCertificate certificate = 2; +} + +message Block { + uint64 fork_id = 1; + uint64 block_number = 2; + Transactions transactions = 3; + Hash block_hash = 4; +} + +message CommitSignature { + // TODO - add flag (no vote, nil, value?) + Address validator_address = 1; + ConsensusSignature signature = 2; + optional Extension extension = 3; +} + +message AggregatedSignature { + repeated CommitSignature signatures = 1; +} + +message CommitCertificate { + uint64 fork_id = 1; + uint64 block_number = 2; + uint32 round = 3; + Hash block_hash = 4; + AggregatedSignature aggregated_signature = 5; +} + +message ProposedValue { + uint64 fork_id = 1; + uint64 block_number = 2; + uint32 round = 3; + optional uint32 valid_round = 4; + Address proposer = 5; + bytes value = 6; + bool validity = 7; + optional Extension extension = 8; +} diff --git a/code/crates/starknet/p2p-proto/src/lib.rs b/code/crates/starknet/p2p-proto/src/lib.rs index 276bf5a6e..ea999161a 100644 --- a/code/crates/starknet/p2p-proto/src/lib.rs +++ b/code/crates/starknet/p2p-proto/src/lib.rs @@ -2,6 +2,6 @@ include!(concat!(env!("OUT_DIR"), "/p2p.rs")); -pub mod blocksync { - include!(concat!(env!("OUT_DIR"), "/blocksync.rs")); +pub mod sync { + include!(concat!(env!("OUT_DIR"), "/sync.rs")); } diff --git a/code/crates/starknet/p2p-types/src/block.rs b/code/crates/starknet/p2p-types/src/block.rs index cfc87663d..a6f7297d9 100644 --- a/code/crates/starknet/p2p-types/src/block.rs +++ b/code/crates/starknet/p2p-types/src/block.rs @@ -11,7 +11,7 @@ pub struct Block { } impl Protobuf for Block { - type Proto = proto::blocksync::Block; + type Proto = proto::sync::Block; fn from_proto(proto: Self::Proto) -> Result { let transactions = proto diff --git a/code/crates/starknet/test/src/lib.rs b/code/crates/starknet/test/src/lib.rs index 0dd5283c9..68d3ee25d 100644 --- a/code/crates/starknet/test/src/lib.rs +++ b/code/crates/starknet/test/src/lib.rs @@ -544,7 +544,7 @@ pub fn make_node_config(test: &Test, i: usize) -> NodeConfig gossip_batch_size: 100, }, blocksync: BlockSyncConfig { - enabled: false, + enabled: true, status_update_interval: Duration::from_secs(2), request_timeout: Duration::from_secs(5), }, diff --git a/code/crates/starknet/test/tests/blocksync.rs b/code/crates/starknet/test/tests/blocksync.rs index 1b8f542eb..86f6ba84c 100644 --- a/code/crates/starknet/test/tests/blocksync.rs +++ b/code/crates/starknet/test/tests/blocksync.rs @@ -1,9 +1,9 @@ use std::time::Duration; +use malachite_config::ValuePayload; use malachite_starknet_test::{Test, TestNode, TestParams}; -#[tokio::test] -pub async fn crash_restart_from_1() { +pub async fn crash_restart_from_start(params: TestParams) { const HEIGHT: u64 = 10; // Node 1 starts with 10 voting power. @@ -40,12 +40,42 @@ pub async fn crash_restart_from_1() { Duration::from_secs(60), // Timeout for the whole test TestParams { enable_blocksync: true, // Enable BlockSync - ..Default::default() + ..params }, ) .await } +#[tokio::test] +pub async fn crash_restart_from_start_parts_only() { + let params = TestParams { + value_payload: ValuePayload::PartsOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_only() { + let params = TestParams { + value_payload: ValuePayload::ProposalOnly, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + +#[tokio::test] +pub async fn crash_restart_from_start_proposal_and_parts() { + let params = TestParams { + value_payload: ValuePayload::ProposalAndParts, + ..Default::default() + }; + + crash_restart_from_start(params).await +} + #[tokio::test] pub async fn crash_restart_from_latest() { const HEIGHT: u64 = 10; diff --git a/code/scripts/spawn.bash b/code/scripts/spawn.bash index 4fd1be371..ecf344408 100755 --- a/code/scripts/spawn.bash +++ b/code/scripts/spawn.bash @@ -38,7 +38,7 @@ fi # Environment variables export MALACHITE__CONSENSUS__P2P__PROTOCOL__TYPE="gossipsub" -export MALACHITE__CONSENSUS__VALUE_PAYLOAD="parts-only" +export MALACHITE__CONSENSUS__VALUE_PAYLOAD="proposal-and-parts" export MALACHITE__CONSENSUS__MAX_BLOCK_SIZE="50KiB" export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE="5s" export MALACHITE__CONSENSUS__TIMEOUT_PROPOSE_DELTA="1s" diff --git a/specs/quint/specs/blocksync/bsyncWithConsensus.qnt b/specs/quint/specs/blocksync/bsyncWithConsensus.qnt index cd85a21f9..6261e1803 100644 --- a/specs/quint/specs/blocksync/bsyncWithConsensus.qnt +++ b/specs/quint/specs/blocksync/bsyncWithConsensus.qnt @@ -30,6 +30,10 @@ module bsyncWithConsensus { // With the default `init` action, it is unlikely to observe this scenario, // even with 200 steps (around 30 minutes) execution: // $ quint run --invariant anyClientOutputWitness bsyncWithConsensus.qnt + // Therefore we have a special init action: + // $ quint run --invariant anyClientOutputWitness bsyncWithConsensus.qnt --init initSetup --step stepWithBlockSync --maxSteps 60 + // Use --seed=0x1060f6cddc9cb5 to reproduce. + val anyClientOutputWitness = Correct.forall(p => system.get(p).incomingSyncCertificates == Set()) @@ -88,6 +92,10 @@ module bsyncWithConsensus { newHeightActionAll(v, validatorSet, system.get(v).es.chain.length()) } + action syncStep = + nondet v = oneOf(Correct) + pureSyncStep(v, unchangedAll) + // // Interesting scenarios // @@ -132,10 +140,6 @@ module bsyncWithConsensus { } } - action syncStep = - nondet v = oneOf(Correct) - pureSyncStep(v, unchangedAll) - /// a simple scenario where v4 starts height h run syncCycle(h) = newHeightActionAll("v4", validatorSet, h) @@ -168,6 +172,18 @@ module bsyncWithConsensus { .expect(system.get("v4").es.cs.height == system.get("v2").es.cs.height) // and now v4 has synced ! + /// initSetup setups two servers (v2, v3) at height 4, which broadcast their + /// status. Client v4 learns the status and starts syncing from height 0. + action initSetup = + initHeight(4) + .then(syncUpdateServer("v2")) + .then(syncUpdateServer("v3")) + .then(all{unchangedAll, syncStatusStep("v2")}) + .then(all{unchangedAll, syncStatusStep("v3")}) + .then(all{unchangedAll, syncDeliverStatus("v4")}) + .then(all{unchangedAll, syncDeliverStatus("v4")}) + .then(newHeightActionAll("v4", validatorSet, 0)) + run lausanneRetreat = initHeight(2) // FIXME: I had to put it here, instead of syncCycle(h)