Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(driver): Turn get_value into an asynchronous step within the state machine #65

Merged
merged 15 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 45 additions & 53 deletions Code/driver/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use malachite_round::state_machine::RoundData;
use malachite_round::state_machine::Info;

use malachite_common::{
Context, Proposal, Round, SignedVote, Timeout, TimeoutStep, Validator, ValidatorSet, Value,
Expand All @@ -12,7 +12,6 @@ use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;
use malachite_vote::ThresholdParams;

use crate::env::Env as DriverEnv;
use crate::event::Event;
use crate::message::Message;
use crate::Error;
Expand All @@ -21,14 +20,12 @@ use crate::Validity;

/// Driver for the state machine of the Malachite consensus engine at a given height.
#[derive(Clone, Debug)]
pub struct Driver<Ctx, Env, PSel>
pub struct Driver<Ctx, PSel>
where
Ctx: Context,
Env: DriverEnv<Ctx>,
PSel: ProposerSelector<Ctx>,
{
pub ctx: Ctx,
pub env: Env,
pub proposer_selector: PSel,

pub address: Ctx::Address,
Expand All @@ -38,15 +35,13 @@ where
pub round_state: RoundState<Ctx>,
}

impl<Ctx, Env, PSel> Driver<Ctx, Env, PSel>
impl<Ctx, PSel> Driver<Ctx, PSel>
where
Ctx: Context,
Env: DriverEnv<Ctx>,
PSel: ProposerSelector<Ctx>,
{
pub fn new(
ctx: Ctx,
env: Env,
proposer_selector: PSel,
validator_set: Ctx::ValidatorSet,
address: Ctx::Address,
Expand All @@ -58,7 +53,6 @@ where

Self {
ctx,
env,
proposer_selector,
address,
validator_set,
Expand All @@ -75,10 +69,17 @@ where
self.round_state.round
}

async fn get_value(&self) -> Option<Ctx::Value> {
self.env
.get_value(self.height().clone(), self.round())
.await
pub fn get_proposer(&self, round: Round) -> Result<&Ctx::Validator, Error<Ctx>> {
let address = self
.proposer_selector
.select_proposer(round, &self.validator_set);

let proposer = self
.validator_set
.get_by_address(&address)
.ok_or_else(|| Error::ProposerNotFound(address))?;

Ok(proposer)
}

pub async fn execute(&mut self, msg: Event<Ctx>) -> Result<Option<Message<Ctx>>, Error<Ctx>> {
Expand All @@ -102,6 +103,10 @@ where

RoundMessage::ScheduleTimeout(timeout) => Message::ScheduleTimeout(timeout),

RoundMessage::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(round, timeout)
}

RoundMessage::Decision(value) => {
// TODO: update the state
Message::Decide(value.round, value.value)
Expand All @@ -114,14 +119,10 @@ where
async fn apply(&mut self, event: Event<Ctx>) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
match event {
Event::NewRound(height, round) => self.apply_new_round(height, round).await,

Event::Proposal(proposal, validity) => {
Ok(self.apply_proposal(proposal, validity).await)
}

Event::ProposeValue(round, value) => self.apply_propose_value(round, value).await,
Event::Proposal(proposal, validity) => self.apply_proposal(proposal, validity).await,
Event::Vote(signed_vote) => self.apply_vote(signed_vote),

Event::TimeoutElapsed(timeout) => Ok(self.apply_timeout(timeout)),
Event::TimeoutElapsed(timeout) => self.apply_timeout(timeout),
}
}

Expand All @@ -132,56 +133,42 @@ where
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
self.round_state = RoundState::new(height, round);

let proposer_address = self
.proposer_selector
.select_proposer(round, &self.validator_set);

let proposer = self
.validator_set
.get_by_address(&proposer_address)
.ok_or_else(|| Error::ProposerNotFound(proposer_address.clone()))?;

let event = if proposer.address() == &self.address {
// We are the proposer
// TODO: Schedule propose timeout

let Some(value) = self.get_value().await else {
return Err(Error::NoValueToPropose);
};

RoundEvent::NewRoundProposer(value)
} else {
RoundEvent::NewRound
};
self.apply_event(round, RoundEvent::NewRound)
}

Ok(self.apply_event(round, event))
async fn apply_propose_value(
&mut self,
round: Round,
value: Ctx::Value,
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
self.apply_event(round, RoundEvent::ProposeValue(value))
}

async fn apply_proposal(
&mut self,
proposal: Ctx::Proposal,
validity: Validity,
) -> Option<RoundMessage<Ctx>> {
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
// Check that there is an ongoing round
if self.round_state.round == Round::NIL {
return None;
return Ok(None);
}

// Only process the proposal if there is no other proposal
if self.round_state.proposal.is_some() {
return None;
return Ok(None);
}

// Check that the proposal is for the current height and round
if self.round_state.height != proposal.height()
|| self.round_state.round != proposal.round()
{
return None;
return Ok(None);
}

// TODO: Document
if proposal.pol_round().is_defined() && proposal.pol_round() >= self.round_state.round {
return None;
return Ok(None);
}

// TODO: Verify proposal signature (make some of these checks part of message validation)
Expand Down Expand Up @@ -215,7 +202,7 @@ where

self.apply_event(round, event)
}
_ => None,
_ => Ok(None),
}
}

Expand Down Expand Up @@ -257,10 +244,10 @@ where
VoteMessage::SkipRound(r) => RoundEvent::SkipRound(r),
};

Ok(self.apply_event(vote_round, round_event))
self.apply_event(vote_round, round_event)
}

fn apply_timeout(&mut self, timeout: Timeout) -> Option<RoundMessage<Ctx>> {
fn apply_timeout(&mut self, timeout: Timeout) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
let event = match timeout.step {
TimeoutStep::Propose => RoundEvent::TimeoutPropose,
TimeoutStep::Prevote => RoundEvent::TimeoutPrevote,
Expand All @@ -271,10 +258,15 @@ where
}

/// Apply the event, update the state.
fn apply_event(&mut self, round: Round, event: RoundEvent<Ctx>) -> Option<RoundMessage<Ctx>> {
fn apply_event(
&mut self,
event_round: Round,
event: RoundEvent<Ctx>,
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
let round_state = core::mem::take(&mut self.round_state);
let proposer = self.get_proposer(round_state.round)?;

let data = RoundData::new(round, round_state.height.clone(), &self.address);
let data = Info::new(event_round, &self.address, proposer.address());

// Multiplex the event with the round state.
let mux_event = match event {
Expand All @@ -301,6 +293,6 @@ where
self.round_state = transition.next_state;

// Return message, if any
transition.message
Ok(transition.message)
}
}
19 changes: 0 additions & 19 deletions Code/driver/src/env.rs

This file was deleted.

5 changes: 0 additions & 5 deletions Code/driver/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ pub enum Error<Ctx>
where
Ctx: Context,
{
/// No value to propose
NoValueToPropose,

/// Proposer not found
ProposerNotFound(Ctx::Address),

Expand All @@ -27,7 +24,6 @@ where
#[cfg_attr(coverage_nightly, coverage(off))]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::NoValueToPropose => write!(f, "No value to propose"),
Error::ProposerNotFound(addr) => write!(f, "Proposer not found: {addr}"),
Error::ValidatorNotFound(addr) => write!(f, "Validator not found: {addr}"),
Error::InvalidVoteSignature(vote, validator) => write!(
Expand All @@ -46,7 +42,6 @@ where
#[cfg_attr(coverage_nightly, coverage(off))]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Error::NoValueToPropose, Error::NoValueToPropose) => true,
(Error::ProposerNotFound(addr1), Error::ProposerNotFound(addr2)) => addr1 == addr2,
(Error::ValidatorNotFound(addr1), Error::ValidatorNotFound(addr2)) => addr1 == addr2,
(
Expand Down
10 changes: 10 additions & 0 deletions Code/driver/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@ pub enum Event<Ctx>
where
Ctx: Context,
{
/// Start a new round
NewRound(Ctx::Height, Round),

/// Propose a value for the given round
ProposeValue(Round, Ctx::Value),

/// Receive a proposal, of the given validity
Proposal(Ctx::Proposal, Validity),

/// Receive a signed vote
Vote(SignedVote<Ctx>),

/// Receive a timeout
TimeoutElapsed(Timeout),
}
2 changes: 0 additions & 2 deletions Code/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
extern crate alloc;

mod driver;
mod env;
mod error;
mod event;
mod message;
mod proposer;
mod util;

pub use driver::Driver;
pub use env::Env;
pub use error::Error;
pub use event::Event;
pub use message::Message;
Expand Down
34 changes: 28 additions & 6 deletions Code/driver/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@ pub enum Message<Ctx>
where
Ctx: Context,
{
/// Start a new round
NewRound(Ctx::Height, Round),

/// Broadcast a proposal
Propose(Ctx::Proposal),

/// Broadcast a vote for a value
Vote(SignedVote<Ctx>),

/// Decide on a value
Decide(Round, Ctx::Value),

/// Schedule a timeout
ScheduleTimeout(Timeout),
NewRound(Ctx::Height, Round),

/// Ask for a value to propose and schedule a timeout
GetValueAndScheduleTimeout(Round, Timeout),
}

// NOTE: We have to derive these instances manually, otherwise
Expand All @@ -22,11 +34,14 @@ impl<Ctx: Context> Clone for Message<Ctx> {
#[cfg_attr(coverage_nightly, coverage(off))]
fn clone(&self) -> Self {
match self {
Message::NewRound(height, round) => Message::NewRound(height.clone(), *round),
Message::Propose(proposal) => Message::Propose(proposal.clone()),
Message::Vote(signed_vote) => Message::Vote(signed_vote.clone()),
Message::Decide(round, value) => Message::Decide(*round, value.clone()),
Message::ScheduleTimeout(timeout) => Message::ScheduleTimeout(*timeout),
Message::NewRound(height, round) => Message::NewRound(height.clone(), *round),
Message::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(*round, *timeout)
}
}
}
}
Expand All @@ -35,11 +50,14 @@ impl<Ctx: Context> fmt::Debug for Message<Ctx> {
#[cfg_attr(coverage_nightly, coverage(off))]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Message::NewRound(height, round) => write!(f, "NewRound({:?}, {:?})", height, round),
Message::Propose(proposal) => write!(f, "Propose({:?})", proposal),
Message::Vote(signed_vote) => write!(f, "Vote({:?})", signed_vote),
Message::Decide(round, value) => write!(f, "Decide({:?}, {:?})", round, value),
Message::ScheduleTimeout(timeout) => write!(f, "ScheduleTimeout({:?})", timeout),
Message::NewRound(height, round) => write!(f, "NewRound({:?}, {:?})", height, round),
Message::GetValueAndScheduleTimeout(round, timeout) => {
write!(f, "GetValueAndScheduleTimeout({:?}, {:?})", round, timeout)
}
}
}
}
Expand All @@ -48,6 +66,9 @@ impl<Ctx: Context> PartialEq for Message<Ctx> {
#[cfg_attr(coverage_nightly, coverage(off))]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Message::NewRound(height, round), Message::NewRound(other_height, other_round)) => {
height == other_height && round == other_round
}
(Message::Propose(proposal), Message::Propose(other_proposal)) => {
proposal == other_proposal
}
Expand All @@ -60,9 +81,10 @@ impl<Ctx: Context> PartialEq for Message<Ctx> {
(Message::ScheduleTimeout(timeout), Message::ScheduleTimeout(other_timeout)) => {
timeout == other_timeout
}
(Message::NewRound(height, round), Message::NewRound(other_height, other_round)) => {
height == other_height && round == other_round
}
(
Message::GetValueAndScheduleTimeout(round, timeout),
Message::GetValueAndScheduleTimeout(other_round, other_timeout),
) => round == other_round && timeout == other_timeout,
_ => false,
}
}
Expand Down
Loading
Loading