Skip to content

Commit

Permalink
chore(code/part_stream): Use random StreamId (#583)
Browse files Browse the repository at this point in the history
* chore(code/part_stream): Use random StreamId to ensure a stream is not re-used when a node crashes and restarts quickly

* Start with random StreamId and increment from there
  • Loading branch information
romac authored and cason committed Nov 22, 2024
1 parent 96be36e commit b76192f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 10 deletions.
27 changes: 19 additions & 8 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use eyre::eyre;

use itertools::Itertools;
use ractor::{async_trait, Actor, ActorProcessingErr, SpawnErr};
use rand::rngs::StdRng;
use rand::{RngCore, SeedableRng};
use sha3::Digest;
use tokio::time::Instant;
use tracing::{debug, error, trace, warn};
Expand Down Expand Up @@ -43,18 +45,29 @@ pub struct HostState {
}

impl HostState {
pub fn new(host: MockHost, db_path: impl AsRef<Path>) -> Self {
pub fn new<R>(host: MockHost, db_path: impl AsRef<Path>, rng: &mut R) -> Self
where
R: RngCore,
{
Self {
height: Height::new(0, 0),
round: Round::Nil,
proposer: None,
host,
block_store: BlockStore::new(db_path).unwrap(),
part_streams_map: PartStreamsMap::default(),
next_stream_id: StreamId::default(),
next_stream_id: rng.next_u64(),
}
}

pub fn next_stream_id(&mut self) -> StreamId {
let stream_id = self.next_stream_id;
// Wrap around if we get to u64::MAX, which may happen if the initial
// stream id was close to it already.
self.next_stream_id = self.next_stream_id.wrapping_add(1);
stream_id
}

#[tracing::instrument(skip_all, fields(%height, %round))]
pub async fn build_value_from_parts(
&self,
Expand Down Expand Up @@ -241,13 +254,12 @@ impl StarknetHost {
) -> Result<HostRef, SpawnErr> {
let db_dir = home_dir.join("db");
std::fs::create_dir_all(&db_dir).map_err(|e| SpawnErr::StartupFailed(e.into()))?;

let db_path = db_dir.join("blocks.db");

let (actor_ref, _) = Actor::spawn(
None,
Self::new(mempool, gossip_consensus, metrics),
HostState::new(host, db_path),
HostState::new(host, db_path, &mut StdRng::from_entropy()),
)
.await?;

Expand All @@ -265,6 +277,7 @@ impl StarknetHost {
metrics,
}
}

async fn prune_block_store(&self, state: &mut HostState) {
let max_height = state.block_store.last_height().unwrap_or_default();
let max_retain_blocks = state.host.params.max_retain_blocks as u64;
Expand Down Expand Up @@ -352,8 +365,7 @@ impl Actor for StarknetHost {
let (mut rx_part, rx_hash) =
state.host.build_new_proposal(height, round, deadline).await;

let stream_id = state.next_stream_id;
state.next_stream_id += 1;
let stream_id = state.next_stream_id();

let mut sequence = 0;

Expand Down Expand Up @@ -417,8 +429,7 @@ impl Actor for StarknetHost {

let mut rx_part = state.host.send_known_proposal(value_id).await;

let stream_id = state.next_stream_id;
state.next_stream_id += 1;
let stream_id = state.next_stream_id();

let init = ProposalInit {
height,
Expand Down
5 changes: 3 additions & 2 deletions code/crates/starknet/host/src/mock/host/build_proposal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ use std::sync::Arc;
use bytes::Bytes;
use bytesize::ByteSize;
use eyre::eyre;
use rand::RngCore;
use rand::rngs::StdRng;
use rand::{RngCore, SeedableRng};
use sha3::Digest;
use tokio::sync::{mpsc, oneshot};
use tokio::time::Instant;
Expand Down Expand Up @@ -151,7 +152,7 @@ async fn run_build_proposal_task(
// BlockProof
{
// TODO: Compute actual "proof"
let mut rng = rand::rngs::OsRng;
let mut rng = StdRng::from_entropy();
let mut proof = vec![0; 32];
rng.fill_bytes(&mut proof);

Expand Down

0 comments on commit b76192f

Please sign in to comment.