Skip to content

Commit

Permalink
chore(code/host): Store undecided blocks before passing them on to co…
Browse files Browse the repository at this point in the history
…nsensus (#569)
  • Loading branch information
romac authored Nov 25, 2024
1 parent 70a1a90 commit d68bb79
Show file tree
Hide file tree
Showing 10 changed files with 422 additions and 218 deletions.
60 changes: 50 additions & 10 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ impl HostState {
stream_id
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_block_from_parts(
&self,
parts: &[Arc<ProposalPart>],
height: Height,
round: Round,
) -> Option<(ProposedValue<MockContext>, Block)> {
let value = self.build_value_from_parts(parts, height, round).await?;

let txes = parts
.iter()
.filter_map(|part| part.as_transactions())
.flat_map(|txes| txes.to_vec())
.collect::<Vec<_>>();

let block = Block {
height,
transactions: Transactions::new(txes),
block_hash: value.value,
};

Some((value, block))
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_value_from_parts(
&self,
Expand Down Expand Up @@ -293,7 +317,7 @@ impl StarknetHost {
match state.block_store.prune(retain_height).await {
Ok(pruned) => {
debug!(
%retain_height, pruned = pruned.iter().join(", "),
%retain_height, pruned_heights = pruned.iter().join(", "),
"Pruned the block store"
);
}
Expand Down Expand Up @@ -371,6 +395,7 @@ impl Actor for StarknetHost {

while let Some(part) = rx_part.recv().await {
state.host.part_store.store(height, round, part.clone());

if state.host.params.value_payload.include_parts() {
debug!(%stream_id, %sequence, "Broadcasting proposal part");

Expand Down Expand Up @@ -404,17 +429,28 @@ impl Actor for StarknetHost {

let parts = state.host.part_store.all_parts(height, round);

let extension = state.host.generate_vote_extension(height, round);
let Some((value, block)) =
state.build_block_from_parts(&parts, height, round).await
else {
error!(%height, %round, "Failed to build block from parts");
return Ok(());
};

if let Some(value) = state.build_value_from_parts(&parts, height, round).await {
reply_to.send(LocallyProposedValue::new(
value.height,
value.round,
value.value,
extension,
))?;
if let Err(e) = state
.block_store
.store_undecided_block(value.height, value.round, block)
.await
{
error!(%e, %height, %round, "Failed to store the proposed block");
}

reply_to.send(LocallyProposedValue::new(
value.height,
value.round,
value.value,
value.extension,
))?;

Ok(())
}

Expand Down Expand Up @@ -545,7 +581,11 @@ impl Actor for StarknetHost {
}

// Build the block from transaction parts and certificate, and store it
if let Err(e) = state.block_store.store(&certificate, &all_txes).await {
if let Err(e) = state
.block_store
.store_decided_block(&certificate, &all_txes)
.await
{
error!(%e, %height, %round, "Failed to store the block");
}

Expand Down
148 changes: 83 additions & 65 deletions code/crates/starknet/host/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ use std::ops::RangeBounds;
use std::path::Path;
use std::sync::Arc;

use malachite_common::CommitCertificate;
use malachite_proto::Protobuf;

use prost::Message;
use redb::ReadableTable;
use thiserror::Error;

use malachite_common::{CommitCertificate, Round};
use malachite_proto::Protobuf;

use crate::codec;
use crate::proto::{self as proto, Error as ProtoError};
use crate::types::MockContext;
use crate::types::{Block, Height, Transaction, Transactions};
use crate::types::{Block, BlockHash, Height, Transaction, Transactions};

mod keys;
use keys::{HeightKey, UndecidedBlockKey};

#[derive(Clone, Debug)]
pub struct DecidedBlock {
Expand All @@ -21,7 +24,7 @@ pub struct DecidedBlock {
}

fn decode_certificate(bytes: &[u8]) -> Result<CommitCertificate<MockContext>, ProtoError> {
let proto = proto::CommitCertificate::decode(bytes)?;
let proto = proto::sync::CommitCertificate::decode(bytes)?;
codec::decode_certificate(proto)
}

Expand Down Expand Up @@ -54,52 +57,14 @@ pub enum StoreError {
TaskJoin(#[from] tokio::task::JoinError),
}

#[derive(Copy, Clone, Debug)]
struct HeightKey;

impl redb::Value for HeightKey {
type SelfType<'a> = Height;

type AsBytes<'a> = Vec<u8>;

fn fixed_width() -> Option<usize> {
Some(core::mem::size_of::<u64>() * 2)
}

fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
let (fork_id, block_number) = <(u64, u64) as redb::Value>::from_bytes(data);

Height {
fork_id,
block_number,
}
}

fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
<(u64, u64) as redb::Value>::as_bytes(&(value.fork_id, value.block_number))
}

fn type_name() -> redb::TypeName {
redb::TypeName::new("starknet::Height")
}
}
const CERTIFICATES_TABLE: redb::TableDefinition<HeightKey, Vec<u8>> =
redb::TableDefinition::new("certificates");

impl redb::Key for HeightKey {
fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
<(u64, u64) as redb::Key>::compare(data1, data2)
}
}
const DECIDED_BLOCKS_TABLE: redb::TableDefinition<HeightKey, Vec<u8>> =
redb::TableDefinition::new("decided_blocks");

const BLOCK_TABLE: redb::TableDefinition<HeightKey, Vec<u8>> = redb::TableDefinition::new("blocks");
const CERTIFICATE_TABLE: redb::TableDefinition<HeightKey, Vec<u8>> =
redb::TableDefinition::new("certificates");
const UNDECIDED_BLOCKS_TABLE: redb::TableDefinition<UndecidedBlockKey, Vec<u8>> =
redb::TableDefinition::new("undecided_blocks");

struct Db {
db: redb::Database,
Expand All @@ -115,12 +80,12 @@ impl Db {
fn get(&self, height: Height) -> Result<Option<DecidedBlock>, StoreError> {
let tx = self.db.begin_read()?;
let block = {
let table = tx.open_table(BLOCK_TABLE)?;
let table = tx.open_table(DECIDED_BLOCKS_TABLE)?;
let value = table.get(&height)?;
value.and_then(|value| Block::from_bytes(&value.value()).ok())
};
let certificate = {
let table = tx.open_table(CERTIFICATE_TABLE)?;
let table = tx.open_table(CERTIFICATES_TABLE)?;
let value = table.get(&height)?;
value.and_then(|value| decode_certificate(&value.value()).ok())
};
Expand All @@ -132,24 +97,41 @@ impl Db {
Ok(decided_block)
}

fn insert(&self, decided_block: DecidedBlock) -> Result<(), StoreError> {
fn insert_decided_block(&self, decided_block: DecidedBlock) -> Result<(), StoreError> {
let height = decided_block.block.height;

let tx = self.db.begin_write()?;
{
let mut blocks = tx.open_table(BLOCK_TABLE)?;
let mut blocks = tx.open_table(DECIDED_BLOCKS_TABLE)?;
blocks.insert(height, decided_block.block.to_bytes()?.to_vec())?;
}
{
let mut certificates = tx.open_table(CERTIFICATE_TABLE)?;
let mut certificates = tx.open_table(CERTIFICATES_TABLE)?;
certificates.insert(height, encode_certificate(decided_block.certificate)?)?;
}
tx.commit()?;

Ok(())
}

fn range<Table>(
fn insert_undecided_block(
&self,
height: Height,
round: Round,
block: Block,
) -> Result<(), StoreError> {
let key = (height, round, block.block_hash);
let value = codec::encode_block(&block)?;
let tx = self.db.begin_write()?;
{
let mut table = tx.open_table(UNDECIDED_BLOCKS_TABLE)?;
table.insert(key, value)?;
}
tx.commit()?;
Ok(())
}

fn height_range<Table>(
&self,
table: &Table,
range: impl RangeBounds<Height>,
Expand All @@ -164,14 +146,39 @@ impl Db {
.collect::<Vec<_>>())
}

fn undecided_block_range<Table>(
&self,
table: &Table,
range: impl RangeBounds<(Height, Round, BlockHash)>,
) -> Result<Vec<(Height, Round, BlockHash)>, StoreError>
where
Table: redb::ReadableTable<UndecidedBlockKey, Vec<u8>>,
{
Ok(table
.range(range)?
.flatten()
.map(|(key, _)| key.value())
.collect::<Vec<_>>())
}

fn prune(&self, retain_height: Height) -> Result<Vec<Height>, StoreError> {
let tx = self.db.begin_write().unwrap();
let pruned = {
let mut blocks = tx.open_table(BLOCK_TABLE)?;
let mut certificates = tx.open_table(CERTIFICATE_TABLE)?;
let keys = self.range(&blocks, ..retain_height)?;
let mut undecided = tx.open_table(UNDECIDED_BLOCKS_TABLE)?;
let keys = self.undecided_block_range(
&undecided,
..(retain_height, Round::Nil, BlockHash::new([0; 32])),
)?;
for key in keys {
undecided.remove(key)?;
}

let mut decided = tx.open_table(DECIDED_BLOCKS_TABLE)?;
let mut certificates = tx.open_table(CERTIFICATES_TABLE)?;

let keys = self.height_range(&decided, ..retain_height)?;
for key in &keys {
blocks.remove(key)?;
decided.remove(key)?;
certificates.remove(key)?;
}
keys
Expand All @@ -183,23 +190,24 @@ impl Db {

fn first_key(&self) -> Option<Height> {
let tx = self.db.begin_read().unwrap();
let table = tx.open_table(BLOCK_TABLE).unwrap();
let table = tx.open_table(DECIDED_BLOCKS_TABLE).unwrap();
let (key, _) = table.first().ok()??;
Some(key.value())
}

fn last_key(&self) -> Option<Height> {
let tx = self.db.begin_read().unwrap();
let table = tx.open_table(BLOCK_TABLE).unwrap();
let table = tx.open_table(DECIDED_BLOCKS_TABLE).unwrap();
let (key, _) = table.last().ok()??;
Some(key.value())
}

fn create_tables(&self) -> Result<(), StoreError> {
let tx = self.db.begin_write()?;
// Implicitly creates the tables if they do not exist yet
let _ = tx.open_table(BLOCK_TABLE)?;
let _ = tx.open_table(CERTIFICATE_TABLE)?;
let _ = tx.open_table(DECIDED_BLOCKS_TABLE)?;
let _ = tx.open_table(CERTIFICATES_TABLE)?;
let _ = tx.open_table(UNDECIDED_BLOCKS_TABLE)?;
tx.commit()?;
Ok(())
}
Expand Down Expand Up @@ -231,7 +239,7 @@ impl BlockStore {
tokio::task::spawn_blocking(move || db.get(height)).await?
}

pub async fn store(
pub async fn store_decided_block(
&self,
certificate: &CommitCertificate<MockContext>,
txes: &[Transaction],
Expand All @@ -246,7 +254,17 @@ impl BlockStore {
};

let db = Arc::clone(&self.db);
tokio::task::spawn_blocking(move || db.insert(decided_block)).await?
tokio::task::spawn_blocking(move || db.insert_decided_block(decided_block)).await?
}

pub async fn store_undecided_block(
&self,
height: Height,
round: Round,
block: Block,
) -> Result<(), StoreError> {
let db = Arc::clone(&self.db);
tokio::task::spawn_blocking(move || db.insert_undecided_block(height, round, block)).await?
}

pub async fn prune(&self, retain_height: Height) -> Result<Vec<Height>, StoreError> {
Expand Down
Loading

0 comments on commit d68bb79

Please sign in to comment.