From d68bb79fd2acfcda6cb74fa75a65c1b164c67492 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 25 Nov 2024 19:24:34 +0100 Subject: [PATCH 1/2] chore(code/host): Store undecided blocks before passing them on to consensus (#569) --- code/crates/starknet/host/src/actor.rs | 60 +++++- code/crates/starknet/host/src/block_store.rs | 148 ++++++++------- .../starknet/host/src/block_store/keys.rs | 124 +++++++++++++ code/crates/starknet/host/src/codec.rs | 173 ++++++++++-------- code/crates/starknet/p2p-proto/build.rs | 2 +- .../starknet/p2p-proto/proto/blocksync.proto | 38 ---- .../p2p-proto/proto/p2p/proto/consensus.proto | 21 --- .../starknet/p2p-proto/proto/sync.proto | 68 +++++++ code/crates/starknet/p2p-proto/src/lib.rs | 4 +- code/crates/starknet/p2p-types/src/block.rs | 2 +- 10 files changed, 422 insertions(+), 218 deletions(-) create mode 100644 code/crates/starknet/host/src/block_store/keys.rs delete mode 100644 code/crates/starknet/p2p-proto/proto/blocksync.proto create mode 100644 code/crates/starknet/p2p-proto/proto/sync.proto diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index ad72f6ce6..f709bbc59 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -68,6 +68,30 @@ impl HostState { stream_id } + #[tracing::instrument(skip_all, fields(%height, %round))] + pub async fn build_block_from_parts( + &self, + parts: &[Arc], + height: Height, + round: Round, + ) -> Option<(ProposedValue, Block)> { + let value = self.build_value_from_parts(parts, height, round).await?; + + let txes = parts + .iter() + .filter_map(|part| part.as_transactions()) + .flat_map(|txes| txes.to_vec()) + .collect::>(); + + let block = Block { + height, + transactions: Transactions::new(txes), + block_hash: value.value, + }; + + Some((value, block)) + } + #[tracing::instrument(skip_all, fields(%height, %round))] pub async fn build_value_from_parts( &self, @@ -293,7 +317,7 @@ impl StarknetHost { match state.block_store.prune(retain_height).await { Ok(pruned) => { debug!( - %retain_height, pruned = pruned.iter().join(", "), + %retain_height, pruned_heights = pruned.iter().join(", "), "Pruned the block store" ); } @@ -371,6 +395,7 @@ impl Actor for StarknetHost { while let Some(part) = rx_part.recv().await { state.host.part_store.store(height, round, part.clone()); + if state.host.params.value_payload.include_parts() { debug!(%stream_id, %sequence, "Broadcasting proposal part"); @@ -404,17 +429,28 @@ impl Actor for StarknetHost { let parts = state.host.part_store.all_parts(height, round); - let extension = state.host.generate_vote_extension(height, round); + let Some((value, block)) = + state.build_block_from_parts(&parts, height, round).await + else { + error!(%height, %round, "Failed to build block from parts"); + return Ok(()); + }; - if let Some(value) = state.build_value_from_parts(&parts, height, round).await { - reply_to.send(LocallyProposedValue::new( - value.height, - value.round, - value.value, - extension, - ))?; + if let Err(e) = state + .block_store + .store_undecided_block(value.height, value.round, block) + .await + { + error!(%e, %height, %round, "Failed to store the proposed block"); } + reply_to.send(LocallyProposedValue::new( + value.height, + value.round, + value.value, + value.extension, + ))?; + Ok(()) } @@ -545,7 +581,11 @@ impl Actor for StarknetHost { } // Build the block from transaction parts and certificate, and store it - if let Err(e) = state.block_store.store(&certificate, &all_txes).await { + if let Err(e) = state + .block_store + .store_decided_block(&certificate, &all_txes) + .await + { error!(%e, %height, %round, "Failed to store the block"); } diff --git a/code/crates/starknet/host/src/block_store.rs b/code/crates/starknet/host/src/block_store.rs index 73881a1d7..f1e79e134 100644 --- a/code/crates/starknet/host/src/block_store.rs +++ b/code/crates/starknet/host/src/block_store.rs @@ -2,17 +2,20 @@ use std::ops::RangeBounds; use std::path::Path; use std::sync::Arc; -use malachite_common::CommitCertificate; -use malachite_proto::Protobuf; - use prost::Message; use redb::ReadableTable; use thiserror::Error; +use malachite_common::{CommitCertificate, Round}; +use malachite_proto::Protobuf; + use crate::codec; use crate::proto::{self as proto, Error as ProtoError}; use crate::types::MockContext; -use crate::types::{Block, Height, Transaction, Transactions}; +use crate::types::{Block, BlockHash, Height, Transaction, Transactions}; + +mod keys; +use keys::{HeightKey, UndecidedBlockKey}; #[derive(Clone, Debug)] pub struct DecidedBlock { @@ -21,7 +24,7 @@ pub struct DecidedBlock { } fn decode_certificate(bytes: &[u8]) -> Result, ProtoError> { - let proto = proto::CommitCertificate::decode(bytes)?; + let proto = proto::sync::CommitCertificate::decode(bytes)?; codec::decode_certificate(proto) } @@ -54,52 +57,14 @@ pub enum StoreError { TaskJoin(#[from] tokio::task::JoinError), } -#[derive(Copy, Clone, Debug)] -struct HeightKey; - -impl redb::Value for HeightKey { - type SelfType<'a> = Height; - - type AsBytes<'a> = Vec; - - fn fixed_width() -> Option { - Some(core::mem::size_of::() * 2) - } - - fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> - where - Self: 'a, - { - let (fork_id, block_number) = <(u64, u64) as redb::Value>::from_bytes(data); - - Height { - fork_id, - block_number, - } - } - - fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> - where - Self: 'a, - Self: 'b, - { - <(u64, u64) as redb::Value>::as_bytes(&(value.fork_id, value.block_number)) - } - - fn type_name() -> redb::TypeName { - redb::TypeName::new("starknet::Height") - } -} +const CERTIFICATES_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("certificates"); -impl redb::Key for HeightKey { - fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { - <(u64, u64) as redb::Key>::compare(data1, data2) - } -} +const DECIDED_BLOCKS_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("decided_blocks"); -const BLOCK_TABLE: redb::TableDefinition> = redb::TableDefinition::new("blocks"); -const CERTIFICATE_TABLE: redb::TableDefinition> = - redb::TableDefinition::new("certificates"); +const UNDECIDED_BLOCKS_TABLE: redb::TableDefinition> = + redb::TableDefinition::new("undecided_blocks"); struct Db { db: redb::Database, @@ -115,12 +80,12 @@ impl Db { fn get(&self, height: Height) -> Result, StoreError> { let tx = self.db.begin_read()?; let block = { - let table = tx.open_table(BLOCK_TABLE)?; + let table = tx.open_table(DECIDED_BLOCKS_TABLE)?; let value = table.get(&height)?; value.and_then(|value| Block::from_bytes(&value.value()).ok()) }; let certificate = { - let table = tx.open_table(CERTIFICATE_TABLE)?; + let table = tx.open_table(CERTIFICATES_TABLE)?; let value = table.get(&height)?; value.and_then(|value| decode_certificate(&value.value()).ok()) }; @@ -132,16 +97,16 @@ impl Db { Ok(decided_block) } - fn insert(&self, decided_block: DecidedBlock) -> Result<(), StoreError> { + fn insert_decided_block(&self, decided_block: DecidedBlock) -> Result<(), StoreError> { let height = decided_block.block.height; let tx = self.db.begin_write()?; { - let mut blocks = tx.open_table(BLOCK_TABLE)?; + let mut blocks = tx.open_table(DECIDED_BLOCKS_TABLE)?; blocks.insert(height, decided_block.block.to_bytes()?.to_vec())?; } { - let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; certificates.insert(height, encode_certificate(decided_block.certificate)?)?; } tx.commit()?; @@ -149,7 +114,24 @@ impl Db { Ok(()) } - fn range( + fn insert_undecided_block( + &self, + height: Height, + round: Round, + block: Block, + ) -> Result<(), StoreError> { + let key = (height, round, block.block_hash); + let value = codec::encode_block(&block)?; + let tx = self.db.begin_write()?; + { + let mut table = tx.open_table(UNDECIDED_BLOCKS_TABLE)?; + table.insert(key, value)?; + } + tx.commit()?; + Ok(()) + } + + fn height_range
( &self, table: &Table, range: impl RangeBounds, @@ -164,14 +146,39 @@ impl Db { .collect::>()) } + fn undecided_block_range
( + &self, + table: &Table, + range: impl RangeBounds<(Height, Round, BlockHash)>, + ) -> Result, StoreError> + where + Table: redb::ReadableTable>, + { + Ok(table + .range(range)? + .flatten() + .map(|(key, _)| key.value()) + .collect::>()) + } + fn prune(&self, retain_height: Height) -> Result, StoreError> { let tx = self.db.begin_write().unwrap(); let pruned = { - let mut blocks = tx.open_table(BLOCK_TABLE)?; - let mut certificates = tx.open_table(CERTIFICATE_TABLE)?; - let keys = self.range(&blocks, ..retain_height)?; + let mut undecided = tx.open_table(UNDECIDED_BLOCKS_TABLE)?; + let keys = self.undecided_block_range( + &undecided, + ..(retain_height, Round::Nil, BlockHash::new([0; 32])), + )?; + for key in keys { + undecided.remove(key)?; + } + + let mut decided = tx.open_table(DECIDED_BLOCKS_TABLE)?; + let mut certificates = tx.open_table(CERTIFICATES_TABLE)?; + + let keys = self.height_range(&decided, ..retain_height)?; for key in &keys { - blocks.remove(key)?; + decided.remove(key)?; certificates.remove(key)?; } keys @@ -183,14 +190,14 @@ impl Db { fn first_key(&self) -> Option { let tx = self.db.begin_read().unwrap(); - let table = tx.open_table(BLOCK_TABLE).unwrap(); + let table = tx.open_table(DECIDED_BLOCKS_TABLE).unwrap(); let (key, _) = table.first().ok()??; Some(key.value()) } fn last_key(&self) -> Option { let tx = self.db.begin_read().unwrap(); - let table = tx.open_table(BLOCK_TABLE).unwrap(); + let table = tx.open_table(DECIDED_BLOCKS_TABLE).unwrap(); let (key, _) = table.last().ok()??; Some(key.value()) } @@ -198,8 +205,9 @@ impl Db { fn create_tables(&self) -> Result<(), StoreError> { let tx = self.db.begin_write()?; // Implicitly creates the tables if they do not exist yet - let _ = tx.open_table(BLOCK_TABLE)?; - let _ = tx.open_table(CERTIFICATE_TABLE)?; + let _ = tx.open_table(DECIDED_BLOCKS_TABLE)?; + let _ = tx.open_table(CERTIFICATES_TABLE)?; + let _ = tx.open_table(UNDECIDED_BLOCKS_TABLE)?; tx.commit()?; Ok(()) } @@ -231,7 +239,7 @@ impl BlockStore { tokio::task::spawn_blocking(move || db.get(height)).await? } - pub async fn store( + pub async fn store_decided_block( &self, certificate: &CommitCertificate, txes: &[Transaction], @@ -246,7 +254,17 @@ impl BlockStore { }; let db = Arc::clone(&self.db); - tokio::task::spawn_blocking(move || db.insert(decided_block)).await? + tokio::task::spawn_blocking(move || db.insert_decided_block(decided_block)).await? + } + + pub async fn store_undecided_block( + &self, + height: Height, + round: Round, + block: Block, + ) -> Result<(), StoreError> { + let db = Arc::clone(&self.db); + tokio::task::spawn_blocking(move || db.insert_undecided_block(height, round, block)).await? } pub async fn prune(&self, retain_height: Height) -> Result, StoreError> { diff --git a/code/crates/starknet/host/src/block_store/keys.rs b/code/crates/starknet/host/src/block_store/keys.rs new file mode 100644 index 000000000..22dbf1168 --- /dev/null +++ b/code/crates/starknet/host/src/block_store/keys.rs @@ -0,0 +1,124 @@ +use core::mem::size_of; + +use malachite_common::Round; +use malachite_starknet_p2p_types::{BlockHash, Height}; + +pub type UndecidedBlockKey = (HeightKey, RoundKey, BlockHashKey); + +#[derive(Copy, Clone, Debug)] +pub struct HeightKey; + +impl redb::Value for HeightKey { + type SelfType<'a> = Height; + type AsBytes<'a> = Vec; + + fn fixed_width() -> Option { + Some(size_of::() * 2) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let (fork_id, block_number) = <(u64, u64) as redb::Value>::from_bytes(data); + + Height { + fork_id, + block_number, + } + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + <(u64, u64) as redb::Value>::as_bytes(&(value.fork_id, value.block_number)) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("starknet::Height") + } +} + +impl redb::Key for HeightKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + <(u64, u64) as redb::Key>::compare(data1, data2) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct RoundKey; + +impl redb::Value for RoundKey { + type SelfType<'a> = Round; + type AsBytes<'a> = [u8; size_of::()]; + + fn fixed_width() -> Option { + Some(size_of::()) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let round = ::from_bytes(data); + Round::from(round) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + ::as_bytes(&value.as_i64()) + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Round") + } +} + +impl redb::Key for RoundKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + ::compare(data1, data2) + } +} + +#[derive(Copy, Clone, Debug)] +pub struct BlockHashKey; + +impl redb::Value for BlockHashKey { + type SelfType<'a> = BlockHash; + type AsBytes<'a> = &'a [u8; 32]; + + fn fixed_width() -> Option { + Some(32) + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + let bytes = <[u8; 32] as redb::Value>::from_bytes(data); + BlockHash::new(bytes) + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + value.as_bytes() + } + + fn type_name() -> redb::TypeName { + redb::TypeName::new("Round") + } +} + +impl redb::Key for BlockHashKey { + fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering { + <[u8; 32] as redb::Key>::compare(data1, data2) + } +} diff --git a/code/crates/starknet/host/src/codec.rs b/code/crates/starknet/host/src/codec.rs index f6add0572..95c210187 100644 --- a/code/crates/starknet/host/src/codec.rs +++ b/code/crates/starknet/host/src/codec.rs @@ -1,3 +1,4 @@ +use malachite_starknet_p2p_types::Block; use prost::Message; use malachite_actors::util::codec::NetworkCodec; @@ -11,7 +12,7 @@ use malachite_consensus::SignedConsensusMsg; use malachite_gossip_consensus::Bytes; use crate::proto::consensus_message::Messages; -use crate::proto::{self as proto, ConsensusMessage, Error as ProtoError, Protobuf}; +use crate::proto::{self as proto, Error as ProtoError, Protobuf}; use crate::types::MockContext; use crate::types::{self as p2p, Address, BlockHash, Height, ProposalPart, Vote}; @@ -33,12 +34,11 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let status = - proto::blocksync::Status::decode(bytes.as_ref()).map_err(ProtoError::Decode)?; + let status = proto::sync::Status::decode(bytes.as_ref()).map_err(ProtoError::Decode)?; let peer_id = status .peer_id - .ok_or_else(|| ProtoError::missing_field::("peer_id"))?; + .ok_or_else(|| ProtoError::missing_field::("peer_id"))?; Ok(blocksync::Status { peer_id: libp2p_identity::PeerId::from_bytes(&peer_id.id) @@ -52,7 +52,7 @@ impl NetworkCodec> for ProtobufCodec { } fn encode(&self, status: blocksync::Status) -> Result { - let proto = proto::blocksync::Status { + let proto = proto::sync::Status { peer_id: Some(proto::PeerId { id: Bytes::from(status.peer_id.to_bytes()), }), @@ -70,7 +70,7 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let request = proto::blocksync::Request::decode(bytes).map_err(ProtoError::Decode)?; + let request = proto::sync::Request::decode(bytes).map_err(ProtoError::Decode)?; Ok(blocksync::Request { height: Height::new(request.block_number, request.fork_id), @@ -78,7 +78,7 @@ impl NetworkCodec> for ProtobufCodec { } fn encode(&self, request: blocksync::Request) -> Result { - let proto = proto::blocksync::Request { + let proto = proto::sync::Request { block_number: request.height.block_number, fork_id: request.height.fork_id, }; @@ -91,7 +91,7 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let response = proto::blocksync::Response::decode(bytes).map_err(ProtoError::Decode)?; + let response = proto::sync::Response::decode(bytes).map_err(ProtoError::Decode)?; Ok(blocksync::Response { height: Height::new(response.block_number, response.fork_id), @@ -100,7 +100,7 @@ impl NetworkCodec> for ProtobufCodec { } fn encode(&self, response: blocksync::Response) -> Result { - let proto = proto::blocksync::Response { + let proto = proto::sync::Response { block_number: response.height.block_number, fork_id: response.height.fork_id, block: response.block.map(encode_synced_block).transpose()?, @@ -114,15 +114,15 @@ impl NetworkCodec> for ProtobufCodec { type Error = ProtoError; fn decode(&self, bytes: Bytes) -> Result, Self::Error> { - let proto = ConsensusMessage::decode(bytes)?; + let proto = proto::ConsensusMessage::decode(bytes)?; let proto_signature = proto .signature - .ok_or_else(|| ProtoError::missing_field::("signature"))?; + .ok_or_else(|| ProtoError::missing_field::("signature"))?; let message = proto .messages - .ok_or_else(|| ProtoError::missing_field::("messages"))?; + .ok_or_else(|| ProtoError::missing_field::("messages"))?; let signature = p2p::Signature::from_proto(proto_signature)?; @@ -137,11 +137,11 @@ impl NetworkCodec> for ProtobufCodec { fn encode(&self, msg: SignedConsensusMsg) -> Result { let message = match msg { - SignedConsensusMsg::Vote(v) => ConsensusMessage { + SignedConsensusMsg::Vote(v) => proto::ConsensusMessage { messages: Some(Messages::Vote(v.to_proto()?)), signature: Some(v.signature.to_proto()?), }, - SignedConsensusMsg::Proposal(p) => ConsensusMessage { + SignedConsensusMsg::Proposal(p) => proto::ConsensusMessage { messages: Some(Messages::Proposal(p.to_proto()?)), signature: Some(p.signature.to_proto()?), }, @@ -185,9 +185,33 @@ where } } +pub(crate) fn encode_synced_block( + synced_block: blocksync::SyncedBlock, +) -> Result { + Ok(proto::sync::SyncedBlock { + block_bytes: synced_block.block_bytes, + certificate: Some(encode_certificate(synced_block.certificate)?), + }) +} + +pub(crate) fn decode_synced_block( + proto: proto::sync::SyncedBlock, +) -> Result, ProtoError> { + let Some(certificate) = proto.certificate else { + return Err(ProtoError::missing_field::( + "certificate", + )); + }; + + Ok(blocksync::SyncedBlock { + block_bytes: proto.block_bytes, + certificate: decode_certificate(certificate)?, + }) +} + pub(crate) fn encode_aggregate_signature( aggregated_signature: AggregatedSignature, -) -> Result { +) -> Result { let signatures = aggregated_signature .signatures .into_iter() @@ -195,50 +219,19 @@ pub(crate) fn encode_aggregate_signature( let validator_address = s.address.to_proto()?; let signature = s.signature.to_proto()?; - Ok(proto::CommitSignature { + Ok(proto::sync::CommitSignature { validator_address: Some(validator_address), signature: Some(signature), - extension: s - .extension - .map(|e| -> Result<_, ProtoError> { - Ok(proto::Extension { - data: e.message.data, - signature: Some(e.signature.to_proto()?), - }) - }) - .transpose()?, + extension: s.extension.map(encode_extension).transpose()?, }) }) .collect::>()?; - Ok(proto::AggregatedSignature { signatures }) -} - -pub(crate) fn encode_certificate( - certificate: CommitCertificate, -) -> Result { - Ok(proto::CommitCertificate { - fork_id: certificate.height.fork_id, - block_number: certificate.height.block_number, - round: certificate.round.as_u32().expect("round should not be nil"), - block_hash: Some(certificate.value_id.to_proto()?), - aggregated_signature: Some(encode_aggregate_signature( - certificate.aggregated_signature, - )?), - }) -} - -pub(crate) fn encode_synced_block( - synced_block: blocksync::SyncedBlock, -) -> Result { - Ok(proto::blocksync::SyncedBlock { - block_bytes: synced_block.block_bytes, - certificate: Some(encode_certificate(synced_block.certificate)?), - }) + Ok(proto::sync::AggregatedSignature { signatures }) } pub(crate) fn decode_aggregated_signature( - signature: proto::AggregatedSignature, + signature: proto::sync::AggregatedSignature, ) -> Result, ProtoError> { let signatures = signature .signatures @@ -246,28 +239,19 @@ pub(crate) fn decode_aggregated_signature( .map(|s| { let signature = s .signature - .ok_or_else(|| ProtoError::missing_field::("signature")) + .ok_or_else(|| { + ProtoError::missing_field::("signature") + }) .and_then(p2p::Signature::from_proto)?; let address = s .validator_address .ok_or_else(|| { - ProtoError::missing_field::("validator_address") + ProtoError::missing_field::("validator_address") }) .and_then(Address::from_proto)?; - let extension = s - .extension - .map(|e| -> Result<_, ProtoError> { - let extension = Extension::from(e.data); - let signature = e - .signature - .ok_or_else(|| ProtoError::missing_field::("signature")) - .and_then(p2p::Signature::from_proto)?; - - Ok(SignedExtension::new(extension, signature)) - }) - .transpose()?; + let extension = s.extension.map(decode_extension).transpose()?; Ok(CommitSignature { address, @@ -280,13 +264,48 @@ pub(crate) fn decode_aggregated_signature( Ok(AggregatedSignature { signatures }) } +pub(crate) fn encode_extension( + ext: SignedExtension, +) -> Result { + Ok(proto::Extension { + data: ext.message.data, + signature: Some(ext.signature.to_proto()?), + }) +} + +pub(crate) fn decode_extension( + ext: proto::Extension, +) -> Result, ProtoError> { + let extension = Extension::from(ext.data); + let signature = ext + .signature + .ok_or_else(|| ProtoError::missing_field::("signature")) + .and_then(p2p::Signature::from_proto)?; + + Ok(SignedExtension::new(extension, signature)) +} + +pub(crate) fn encode_certificate( + certificate: CommitCertificate, +) -> Result { + Ok(proto::sync::CommitCertificate { + fork_id: certificate.height.fork_id, + block_number: certificate.height.block_number, + round: certificate.round.as_u32().expect("round should not be nil"), + block_hash: Some(certificate.value_id.to_proto()?), + aggregated_signature: Some(encode_aggregate_signature( + certificate.aggregated_signature, + )?), + }) +} + pub(crate) fn decode_certificate( - certificate: proto::CommitCertificate, + certificate: proto::sync::CommitCertificate, ) -> Result, ProtoError> { let value_id = if let Some(block_hash) = certificate.block_hash { BlockHash::from_proto(block_hash)? } else { - return Err(ProtoError::missing_field::( + return Err(ProtoError::missing_field::( "block_hash", )); }; @@ -294,7 +313,7 @@ pub(crate) fn decode_certificate( let aggregated_signature = if let Some(agg_sig) = certificate.aggregated_signature { decode_aggregated_signature(agg_sig)? } else { - return Err(ProtoError::missing_field::( + return Err(ProtoError::missing_field::( "aggregated_signature", )); }; @@ -309,19 +328,13 @@ pub(crate) fn decode_certificate( Ok(certificate) } -pub(crate) fn decode_synced_block( - synced_block: proto::blocksync::SyncedBlock, -) -> Result, ProtoError> { - let certificate = if let Some(certificate) = synced_block.certificate { - certificate - } else { - return Err(ProtoError::missing_field::( - "certificate", - )); +pub(crate) fn encode_block(block: &Block) -> Result, ProtoError> { + let proto = proto::sync::Block { + fork_id: block.height.fork_id, + block_number: block.height.block_number, + transactions: Some(block.transactions.to_proto()?), + block_hash: Some(block.block_hash.to_proto()?), }; - Ok(blocksync::SyncedBlock { - block_bytes: synced_block.block_bytes, - certificate: decode_certificate(certificate)?, - }) + Ok(proto.encode_to_vec()) } diff --git a/code/crates/starknet/p2p-proto/build.rs b/code/crates/starknet/p2p-proto/build.rs index 4a5fac9a4..3d026ee65 100644 --- a/code/crates/starknet/p2p-proto/build.rs +++ b/code/crates/starknet/p2p-proto/build.rs @@ -1,6 +1,6 @@ fn main() -> Result<(), Box> { let protos = &[ - "./proto/blocksync.proto", + "./proto/sync.proto", "./proto/p2p/proto/common.proto", "./proto/p2p/proto/header.proto", "./proto/p2p/proto/transaction.proto", diff --git a/code/crates/starknet/p2p-proto/proto/blocksync.proto b/code/crates/starknet/p2p-proto/proto/blocksync.proto deleted file mode 100644 index d851d5d52..000000000 --- a/code/crates/starknet/p2p-proto/proto/blocksync.proto +++ /dev/null @@ -1,38 +0,0 @@ -syntax = "proto3"; - -package blocksync; - -import "p2p/proto/common.proto"; -import "p2p/proto/consensus.proto"; -import "p2p/proto/transaction.proto"; - -message Status { - PeerID peer_id = 1; - uint64 block_number = 2; - uint64 fork_id = 3; - uint64 earliest_block_number = 4; - uint64 earliest_fork_id = 5; -} - -message Request { - uint64 block_number = 1; - uint64 fork_id = 2; -} - -message Response { - uint64 block_number = 1; - uint64 fork_id = 2; - SyncedBlock block = 3; -} - -message SyncedBlock { - bytes block_bytes = 1; - CommitCertificate certificate = 2; -} - -message Block { - uint64 fork_id = 1; - uint64 block_number = 2; - Transactions transactions = 3; - Hash block_hash = 4; -} diff --git a/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto b/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto index 89359584d..9715307ca 100644 --- a/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto +++ b/code/crates/starknet/p2p-proto/proto/p2p/proto/consensus.proto @@ -94,24 +94,3 @@ message Extension { ConsensusSignature signature = 2; } -// ADDED -message CommitSignature { - // TODO - add flag (no vote, nil, value?) - Address validator_address = 1; - ConsensusSignature signature = 2; - optional Extension extension = 3; -} - -// ADDED -message AggregatedSignature { - repeated CommitSignature signatures = 1; -} - -// ADDED -message CommitCertificate { - uint64 fork_id = 1; - uint64 block_number = 2; - uint32 round = 3; - Hash block_hash = 4; - AggregatedSignature aggregated_signature = 5; -} diff --git a/code/crates/starknet/p2p-proto/proto/sync.proto b/code/crates/starknet/p2p-proto/proto/sync.proto new file mode 100644 index 000000000..316e4b1fa --- /dev/null +++ b/code/crates/starknet/p2p-proto/proto/sync.proto @@ -0,0 +1,68 @@ +syntax = "proto3"; + +package sync; + +import "p2p/proto/common.proto"; +import "p2p/proto/consensus.proto"; +import "p2p/proto/transaction.proto"; + +message Status { + PeerID peer_id = 1; + uint64 block_number = 2; + uint64 fork_id = 3; + uint64 earliest_block_number = 4; + uint64 earliest_fork_id = 5; +} + +message Request { + uint64 block_number = 1; + uint64 fork_id = 2; +} + +message Response { + uint64 block_number = 1; + uint64 fork_id = 2; + SyncedBlock block = 3; +} + +message SyncedBlock { + bytes block_bytes = 1; + CommitCertificate certificate = 2; +} + +message Block { + uint64 fork_id = 1; + uint64 block_number = 2; + Transactions transactions = 3; + Hash block_hash = 4; +} + +message CommitSignature { + // TODO - add flag (no vote, nil, value?) + Address validator_address = 1; + ConsensusSignature signature = 2; + optional Extension extension = 3; +} + +message AggregatedSignature { + repeated CommitSignature signatures = 1; +} + +message CommitCertificate { + uint64 fork_id = 1; + uint64 block_number = 2; + uint32 round = 3; + Hash block_hash = 4; + AggregatedSignature aggregated_signature = 5; +} + +message ProposedValue { + uint64 fork_id = 1; + uint64 block_number = 2; + uint32 round = 3; + optional uint32 valid_round = 4; + Address proposer = 5; + bytes value = 6; + bool validity = 7; + optional Extension extension = 8; +} diff --git a/code/crates/starknet/p2p-proto/src/lib.rs b/code/crates/starknet/p2p-proto/src/lib.rs index 276bf5a6e..ea999161a 100644 --- a/code/crates/starknet/p2p-proto/src/lib.rs +++ b/code/crates/starknet/p2p-proto/src/lib.rs @@ -2,6 +2,6 @@ include!(concat!(env!("OUT_DIR"), "/p2p.rs")); -pub mod blocksync { - include!(concat!(env!("OUT_DIR"), "/blocksync.rs")); +pub mod sync { + include!(concat!(env!("OUT_DIR"), "/sync.rs")); } diff --git a/code/crates/starknet/p2p-types/src/block.rs b/code/crates/starknet/p2p-types/src/block.rs index cfc87663d..a6f7297d9 100644 --- a/code/crates/starknet/p2p-types/src/block.rs +++ b/code/crates/starknet/p2p-types/src/block.rs @@ -11,7 +11,7 @@ pub struct Block { } impl Protobuf for Block { - type Proto = proto::blocksync::Block; + type Proto = proto::sync::Block; fn from_proto(proto: Self::Proto) -> Result { let transactions = proto From 29311f242caca69335231d5ba0223c369faeb6bf Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Mon, 25 Nov 2024 20:23:52 +0100 Subject: [PATCH 2/2] chore(code/consensus): Only buffer inputs for the highest height seen so far (#566) --- code/crates/consensus/src/handle/proposal.rs | 13 +- .../consensus/src/handle/proposed_value.rs | 13 +- code/crates/consensus/src/handle/vote.rs | 21 ++- code/crates/consensus/src/state.rs | 19 +- code/crates/consensus/src/util/max_queue.rs | 163 ++++++++++++++++++ code/crates/consensus/src/util/mod.rs | 1 + 6 files changed, 196 insertions(+), 34 deletions(-) create mode 100644 code/crates/consensus/src/util/max_queue.rs diff --git a/code/crates/consensus/src/handle/proposal.rs b/code/crates/consensus/src/handle/proposal.rs index 9e6742f9e..1d9f20dbe 100644 --- a/code/crates/consensus/src/handle/proposal.rs +++ b/code/crates/consensus/src/handle/proposal.rs @@ -50,20 +50,15 @@ where // Drop all others. if state.driver.round() == Round::Nil { debug!("Received proposal at round -1, queuing for later"); - state - .input_queue - .push_back(Input::Proposal(signed_proposal)); + state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal)); + return Ok(()); } if proposal_height > consensus_height { - if consensus_height.increment() == proposal_height { - debug!("Received proposal for next height, queuing for later"); + debug!("Received proposal for higher height, queuing for later"); + state.buffer_input(signed_proposal.height(), Input::Proposal(signed_proposal)); - state - .input_queue - .push_back(Input::Proposal(signed_proposal)); - } return Ok(()); } diff --git a/code/crates/consensus/src/handle/proposed_value.rs b/code/crates/consensus/src/handle/proposed_value.rs index 32cc2db69..7602d825a 100644 --- a/code/crates/consensus/src/handle/proposed_value.rs +++ b/code/crates/consensus/src/handle/proposed_value.rs @@ -28,12 +28,13 @@ where } if state.driver.height() < proposed_value.height { - if state.driver.height().increment() == proposed_value.height { - debug!("Received value for next height, queuing for later"); - state - .input_queue - .push_back(Input::ProposedValue(proposed_value, origin)); - } + debug!("Received value for higher height, queuing for later"); + + state.buffer_input( + proposed_value.height, + Input::ProposedValue(proposed_value, origin), + ); + return Ok(()); } diff --git a/code/crates/consensus/src/handle/vote.rs b/code/crates/consensus/src/handle/vote.rs index 27efded2f..2fb4e0477 100644 --- a/code/crates/consensus/src/handle/vote.rs +++ b/code/crates/consensus/src/handle/vote.rs @@ -55,21 +55,20 @@ where "Received vote at round -1, queuing for later" ); - state.input_queue.push_back(Input::Vote(signed_vote)); + state.buffer_input(vote_height, Input::Vote(signed_vote)); return Ok(()); } if consensus_height < vote_height { - if consensus_height.increment() == vote_height { - debug!( - consensus.height = %consensus_height, - vote.height = %vote_height, - validator = %validator_address, - "Received vote for next height, queuing for later" - ); - - state.input_queue.push_back(Input::Vote(signed_vote)); - } + debug!( + consensus.height = %consensus_height, + vote.height = %vote_height, + validator = %validator_address, + "Received vote for higher height, queuing for later" + ); + + state.buffer_input(vote_height, Input::Vote(signed_vote)); + return Ok(()); } diff --git a/code/crates/consensus/src/state.rs b/code/crates/consensus/src/state.rs index a33367419..9397a0030 100644 --- a/code/crates/consensus/src/state.rs +++ b/code/crates/consensus/src/state.rs @@ -1,13 +1,12 @@ -use std::collections::{BTreeMap, BTreeSet, VecDeque}; -use tracing::debug; +use std::collections::{BTreeMap, BTreeSet}; +use tracing::{debug, warn}; use malachite_common::*; use malachite_driver::Driver; -use tracing::warn; use crate::input::Input; -use crate::{FullProposal, FullProposalKeeper}; -use crate::{Params, ProposedValue}; +use crate::util::max_queue::MaxQueue; +use crate::{FullProposal, FullProposalKeeper, Params, ProposedValue}; /// The state maintained by consensus for processing a [`Input`][crate::Input]. pub struct State @@ -23,9 +22,8 @@ where /// Driver for the per-round consensus state machine pub driver: Driver, - /// A queue of inputs that were received before the - /// driver started the new height. - pub input_queue: VecDeque>, + /// A queue of inputs that were received before the driver started. + pub input_queue: MaxQueue>, /// The proposals to decide on. pub full_proposal_keeper: FullProposalKeeper, @@ -151,6 +149,11 @@ where self.full_proposal_keeper.remove_full_proposals(height) } + /// Queue an input for later processing, only keep inputs for the highest height seen so far. + pub fn buffer_input(&mut self, height: Ctx::Height, input: Input) { + self.input_queue.push(height, input); + } + pub fn print_state(&self) { if let Some(per_round) = self.driver.votes().per_round(self.driver.round()) { warn!( diff --git a/code/crates/consensus/src/util/max_queue.rs b/code/crates/consensus/src/util/max_queue.rs new file mode 100644 index 000000000..c783a125d --- /dev/null +++ b/code/crates/consensus/src/util/max_queue.rs @@ -0,0 +1,163 @@ +/// A data structure that maintains a queue of values associated with monotonically increasing indices, +/// retaining only those values associated with the maximum index seen so far. +/// +/// # Type Parameters +/// - `I`: The type of the index associated with each value in the queue. +/// - `T`: The type of values stored in the queue. +/// +/// # Invariant +/// - All values in the queue are associated with the maximum index observed so far. +#[derive(Clone, Debug)] +pub struct MaxQueue { + /// The highest index observed, which determines the values retained in the queue. + highest_index: I, + + /// A vector storing the values associated with the maximum index. + /// Values are appended in the order they are pushed. + queue: Vec, +} + +impl Default for MaxQueue +where + I: Default, +{ + /// Creates a `MaxQueue` with the default index value and an empty queue. + /// + /// # Returns + /// - A `MaxQueue` instance with `current` initialized to the default value of `I` and an empty `queue`. + fn default() -> Self { + Self { + highest_index: Default::default(), + queue: Default::default(), + } + } +} + +impl MaxQueue { + /// Constructs a new, empty `MaxQueue` with its index set to default. + /// + /// # Returns + /// - A new `MaxQueue` with default `current` index and an empty queue. + pub fn new() -> Self + where + I: Default, + { + Self::default() + } + + /// Pushes a value into the queue with an associated index. + /// + /// - If the `index` is greater than the highest index seen so far, the queue is cleared, + /// the highest index seen so far is updated, and the value is added. + /// - If the `index` is equal to the highest index seen so far, the value is appended to the queue. + /// - If the `index` is less than the highest index seen so far, the value is ignored. + /// + /// # Arguments + /// - `index`: The index associated with the value. + /// - `value`: The value to be stored in the queue. + /// + /// # Returns + /// - Whether or not the value was inserted into the queue. + #[allow(clippy::comparison_chain)] + pub fn push(&mut self, index: I, value: T) -> bool + where + I: Ord, + { + if index > self.highest_index { + // New highest index, clear the queue, insert the new value + self.highest_index = index; + self.queue.clear(); + self.queue.push(value); + true + } else if index == self.highest_index { + // Same index, insert the new value + self.queue.push(value); + true + } else { + // Smaller index, ignore the value + false + } + } + + /// Returns an iterator over references to the values in the queue. + /// + /// # Returns + /// - An iterator producing references to each value stored in the queue in order of insertion. + pub fn iter(&self) -> impl Iterator { + self.queue.iter() + } + + /// Returns how many values are stored in queue. + pub fn len(&self) -> usize { + self.queue.len() + } + + /// Returns whether the queue is empty. + pub fn is_empty(&self) -> bool { + self.queue.is_empty() + } + + /// Returns the queue. + pub fn into_vec(self) -> Vec { + self.queue + } + + /// Returns a clone of the queue. + pub fn to_vec(&self) -> Vec + where + T: Clone, + { + self.queue.to_vec() + } +} + +/// Consumes the `MaxQueue` and returns an iterator that yields its values. +/// +/// # Returns +/// - An iterator over values in the queue. +impl IntoIterator for MaxQueue { + type Item = T; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.queue.into_iter() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_max_queue() { + let mut queue = MaxQueue::new(); + + assert!(queue.is_empty()); + assert_eq!(queue.len(), 0); + + assert!(queue.push(1, "one")); + assert_eq!(queue.len(), 1); + assert!(!queue.is_empty()); + assert_eq!(queue.to_vec(), vec!["one"]); + + assert!(queue.push(2, "two")); + assert_eq!(queue.len(), 1); + assert!(!queue.is_empty()); + assert_eq!(queue.to_vec(), vec!["two"]); + + assert!(!queue.push(1, "one again")); + assert_eq!(queue.len(), 1); + assert!(!queue.is_empty()); + assert_eq!(queue.to_vec(), vec!["two"]); + + assert!(queue.push(2, "two again")); + assert_eq!(queue.len(), 2); + assert!(!queue.is_empty()); + assert_eq!(queue.to_vec(), vec!["two", "two again"]); + + assert!(queue.push(3, "three")); + assert_eq!(queue.len(), 1); + assert!(!queue.is_empty()); + assert_eq!(queue.to_vec(), vec!["three"]); + } +} diff --git a/code/crates/consensus/src/util/mod.rs b/code/crates/consensus/src/util/mod.rs index c3b710974..221c72735 100644 --- a/code/crates/consensus/src/util/mod.rs +++ b/code/crates/consensus/src/util/mod.rs @@ -1 +1,2 @@ +pub mod max_queue; pub mod pretty;