Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(code/consensus): Add support for full nodes #750

Merged
merged 7 commits into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 41 additions & 35 deletions code/crates/core-consensus/src/handle/driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,36 +194,39 @@ where
"Proposing value"
);

let signed_proposal = sign_proposal(co, proposal).await?;

if signed_proposal.pol_round().is_defined() {
perform!(
co,
Effect::RestreamValue(
signed_proposal.height(),
signed_proposal.round(),
signed_proposal.pol_round(),
signed_proposal.validator_address().clone(),
signed_proposal.value().id(),
Default::default()
)
);
// Only sign and publish if we're in the validator set
if state.is_validator() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the proposal filtering, seems we are trying to defend against a bug where get_proposer() returns a non-validator/ full node address...but in this case the is_validator() should also fail.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it's just defensive coding. Maybe we can change the check to do a debug_assert or even an assert in order to ensure this code path is never hit.

let signed_proposal = sign_proposal(co, proposal).await?;

if signed_proposal.pol_round().is_defined() {
perform!(
co,
Effect::RestreamValue(
signed_proposal.height(),
signed_proposal.round(),
signed_proposal.pol_round(),
signed_proposal.validator_address().clone(),
signed_proposal.value().id(),
Default::default()
)
);
}

on_proposal(co, state, metrics, signed_proposal.clone()).await?;

// Proposal messages should not be broadcasted if they are implicit,
// instead they should be inferred from the block parts.
if state.params.value_payload.include_proposal() {
perform!(
co,
Effect::Publish(
SignedConsensusMsg::Proposal(signed_proposal),
Default::default()
)
);
};
}

on_proposal(co, state, metrics, signed_proposal.clone()).await?;

// Proposal messages should not be broadcasted if they are implicit,
// instead they should be inferred from the block parts.
if state.params.value_payload.include_proposal() {
perform!(
co,
Effect::Publish(
SignedConsensusMsg::Proposal(signed_proposal),
Default::default()
)
);
};

Ok(())
}

Expand All @@ -235,15 +238,18 @@ where
"Voting",
);

let extended_vote = extend_vote(vote, state);
let signed_vote = sign_vote(co, extended_vote).await?;
// Only sign and publish if we're in the validator set
if state.is_validator() {
let extended_vote = extend_vote(vote, state);
let signed_vote = sign_vote(co, extended_vote).await?;

on_vote(co, state, metrics, signed_vote.clone()).await?;
on_vote(co, state, metrics, signed_vote.clone()).await?;

perform!(
co,
Effect::Publish(SignedConsensusMsg::Vote(signed_vote), Default::default())
);
perform!(
co,
Effect::Publish(SignedConsensusMsg::Vote(signed_vote), Default::default())
);
}

Ok(())
}
Expand Down
25 changes: 14 additions & 11 deletions code/crates/core-consensus/src/handle/vote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,21 @@ where

debug_assert_eq!(consensus_height, vote_height);

// Append the vote to the Write-ahead Log
perform!(
co,
Effect::WalAppendMessage(
SignedConsensusMsg::Vote(signed_vote.clone()),
Default::default()
)
);
// Only append to WAL and store precommits if we're in the validator set
if state.is_validator() {
// Append the vote to the Write-ahead Log
perform!(
co,
Effect::WalAppendMessage(
SignedConsensusMsg::Vote(signed_vote.clone()),
Default::default()
)
);

// Store the non-nil Precommits.
if signed_vote.vote_type() == VoteType::Precommit && signed_vote.value().is_val() {
state.store_signed_precommit(signed_vote.clone());
// Store the non-nil Precommits.
if signed_vote.vote_type() == VoteType::Precommit && signed_vote.value().is_val() {
state.store_signed_precommit(signed_vote.clone());
}
}

apply_driver_input(co, state, metrics, DriverInput::Vote(signed_vote)).await?;
Expand Down
7 changes: 7 additions & 0 deletions code/crates/core-consensus/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,4 +220,11 @@ where
);
}
}

/// Check if we are a validator node, i.e. we are present in the current validator set.
pub fn is_validator(&self) -> bool {
self.validator_set()
.get_by_address(self.address())
.is_some()
}
}
20 changes: 19 additions & 1 deletion code/crates/starknet/test/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,19 @@ impl<State> TestNode<State> {
self.steps.push(Step::Success);
self
}

pub fn full_node(&mut self) -> &mut Self {
self.voting_power = 0;
hoank101 marked this conversation as resolved.
Show resolved Hide resolved
// Ensure full nodes never participate in consensus
self.on_vote(|_vote, _state| {
panic!("Full nodes should never vote");
});

self.on_proposed_value(|_proposal, _state| {
panic!("Full nodes should never propose values");
});
self
}
}

fn unique_id() -> usize {
Expand Down Expand Up @@ -341,6 +354,7 @@ where
S: Send + Sync + 'static,
{
pub fn new(nodes: Vec<TestNode<S>>) -> Self {
// Only include nodes with non-zero voting power in the validator set
let vals_and_keys = make_validators(voting_powers(&nodes));
let (validators, private_keys): (Vec<_>, Vec<_>) = vals_and_keys.into_iter().unzip();
let validator_set = ValidatorSet::new(validators);
Expand Down Expand Up @@ -740,7 +754,11 @@ pub fn make_node_config<S>(test: &Test<S>, i: usize) -> NodeConfig {
}

fn voting_powers<S>(nodes: &[TestNode<S>]) -> Vec<VotingPower> {
hoank101 marked this conversation as resolved.
Show resolved Hide resolved
nodes.iter().map(|node| node.voting_power).collect()
nodes
.iter()
.filter(|node| node.voting_power > 0)
.map(|node| node.voting_power)
.collect()
}

pub fn make_validators(voting_powers: Vec<VotingPower>) -> Vec<(Validator, PrivateKey)> {
Expand Down
1 change: 1 addition & 0 deletions code/crates/starknet/test/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod full_nodes;
pub mod n3f0;
pub mod n3f0_consensus_mode;
pub mod n3f0_pubsub_protocol;
Expand Down
165 changes: 165 additions & 0 deletions code/crates/starknet/test/src/tests/full_nodes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
use std::time::Duration;

use crate::{init_logging, TestBuilder};

#[tokio::test]
pub async fn basic_full_node() {
init_logging(module_path!());

const HEIGHT: u64 = 5;

let mut test = TestBuilder::<()>::new();

// Add 3 validators with different voting powers
test.add_node()
.with_voting_power(10)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(30)
.start()
.wait_until(HEIGHT)
.success();

// Add 2 full nodes that should follow consensus but not participate
hoank101 marked this conversation as resolved.
Show resolved Hide resolved
test.add_node()
.full_node()
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.full_node()
.start()
.wait_until(HEIGHT)
.success();

test.build().run(Duration::from_secs(30)).await
}

#[tokio::test]
pub async fn full_node_crash_and_sync() {
init_logging(module_path!());

const HEIGHT: u64 = 10;

let mut test = TestBuilder::<()>::new();

// Add validators
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();

// Add a full node that crashes and needs to sync
test.add_node()
.full_node()
.start()
.wait_until(3)
.crash()
.reset_db()
.restart_after(Duration::from_secs(5))
.wait_until(HEIGHT)
.success();

test.build().run(Duration::from_secs(60)).await
}

#[tokio::test]
pub async fn late_starting_full_node() {
init_logging(module_path!());

const HEIGHT: u64 = 10;

let mut test = TestBuilder::<()>::new();

// Add validators that start immediately
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(20)
.start()
.wait_until(HEIGHT)
.success();

// Add a full node that starts late
test.add_node()
.full_node()
.start_after(1, Duration::from_secs(10))
.wait_until(HEIGHT)
.success();

test.build().run(Duration::from_secs(60)).await
}

#[tokio::test]
pub async fn mixed_validator_and_full_node_failures() {
init_logging(module_path!());

const HEIGHT: u64 = 10;

let mut test = TestBuilder::<()>::new();

// Add stable validators
test.add_node()
.with_voting_power(30)
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.with_voting_power(30)
.start()
.wait_until(HEIGHT)
.success();

// Add a validator that crashes
test.add_node()
.with_voting_power(20)
.start()
.wait_until(5)
.crash()
.restart_after(Duration::from_secs(10))
.wait_until(HEIGHT)
.success();

// Add full nodes - one stable, one that crashes
test.add_node()
.full_node()
.start()
.wait_until(HEIGHT)
.success();
test.add_node()
.full_node()
.start()
.wait_until(6)
.crash()
.restart_after(Duration::from_secs(15))
.wait_until(HEIGHT)
.success();

test.build().run(Duration::from_secs(60)).await
}
Loading