Skip to content

Commit

Permalink
wip: Replay WAL entries on restart
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Nov 26, 2024
1 parent 2b08aa2 commit 4fe607b
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 75 deletions.
183 changes: 147 additions & 36 deletions code/crates/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use libp2p::PeerId;
use ractor::{Actor, ActorProcessingErr, ActorRef, RpcReplyPort};
use tokio::sync::broadcast;
use tokio::time::Instant;
use tracing::trace;
use tracing::{debug, error, info, warn};

use malachite_blocksync as blocksync;
Expand All @@ -15,7 +16,7 @@ use malachite_common::{
ValueOrigin,
};
use malachite_config::TimeoutConfig;
use malachite_consensus::{Effect, Resume, ValueToPropose};
use malachite_consensus::{Effect, Resume, SignedConsensusMsg, ValueToPropose};
use malachite_metrics::Metrics;

use crate::block_sync::BlockSyncRef;
Expand All @@ -24,6 +25,7 @@ use crate::gossip_consensus::{GossipConsensusRef, GossipEvent, Msg as GossipCons
use crate::host::{HostMsg, HostRef, LocallyProposedValue, ProposedValue};
use crate::util::forward::forward;
use crate::util::timers::{TimeoutElapsed, TimerScheduler};
use crate::wal::WalEntry;
use crate::wal::{Msg as WalMsg, WalRef};

pub use malachite_consensus::Error as ConsensusError;
Expand Down Expand Up @@ -114,6 +116,13 @@ impl Timeouts {
}
}

#[derive(Copy, Clone, Debug, PartialEq, Eq)]
enum Phase {
Unstarted,
Running,
Recovering,
}

pub struct State<Ctx: Context> {
/// Scheduler for timers
timers: Timers<Ctx>,
Expand All @@ -126,6 +135,9 @@ pub struct State<Ctx: Context> {

/// The set of peers we are connected to.
connected_peers: BTreeSet<PeerId>,

/// The current phase
phase: Phase,
}

impl<Ctx> State<Ctx>
Expand Down Expand Up @@ -207,7 +219,14 @@ where
state: &mut state.consensus,
metrics: &self.metrics,
with: effect => {
self.handle_effect(myself, height, &mut state.timers, &mut state.timeouts, effect).await
self.handle_effect(
myself,
height,
&mut state.timers,
&mut state.timeouts,
state.phase,
effect
).await
}
)
}
Expand All @@ -220,15 +239,7 @@ where
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::StartHeight(height) => {
if let Err(e) = self.wal.cast(WalMsg::StartedHeight(height)) {
error!(%height, "Error when notifying WAL of started height: {e}");
}

if let Some(block_sync) = &self.block_sync {
if let Err(e) = block_sync.cast(BlockSyncMsg::StartHeight(height)) {
error!(%height, "Error when notifying BlockSync of started height: {e}")
}
}
state.phase = Phase::Running;

let validator_set = self.get_validator_set(height).await?;

Expand All @@ -244,6 +255,16 @@ where
error!(%height, "Error when starting height: {e}");
}

if let Err(e) = self.check_and_replay_wal(&myself, state, height).await {
error!(%height, "Error when checking and replaying WAL: {e}");
}

if let Some(block_sync) = &self.block_sync {
if let Err(e) = block_sync.cast(BlockSyncMsg::StartHeight(height)) {
error!(%height, "Error when notifying BlockSync of started height: {e}")
}
}

Ok(())
}

Expand All @@ -263,7 +284,7 @@ where
.await;

if let Err(e) = result {
error!("Error when processing ProposeValue message: {e:?}");
error!("Error when processing ProposeValue message: {e}");
}

Ok(())
Expand Down Expand Up @@ -344,7 +365,7 @@ where
)
.await
{
error!(%height, %request_id, "Error when processing received synced block: {e:?}");
error!(%height, %request_id, "Error when processing received synced block: {e}");

let Some(block_sync) = self.block_sync.as_ref() else {
warn!("Received BlockSync response but BlockSync actor is not available");
Expand All @@ -356,7 +377,7 @@ where
.cast(BlockSyncMsg::InvalidCertificate(peer, certificate, e))
.map_err(|e| {
eyre!(
"Error when notifying BlockSync of invalid certificate: {e:?}"
"Error when notifying BlockSync of invalid certificate: {e}"
)
})?;
}
Expand All @@ -368,7 +389,7 @@ where
.process_input(&myself, state, ConsensusInput::Vote(vote))
.await
{
error!(%from, "Error when processing vote: {e:?}");
error!(%from, "Error when processing vote: {e}");
}
}

Expand All @@ -382,7 +403,7 @@ where
.process_input(&myself, state, ConsensusInput::Proposal(proposal))
.await
{
error!(%from, "Error when processing proposal: {e:?}");
error!(%from, "Error when processing proposal: {e}");
}
}

Expand All @@ -404,7 +425,7 @@ where
None,
)
.map_err(|e| {
eyre!("Error when forwarding proposal parts to host: {e:?}")
eyre!("Error when forwarding proposal parts to host: {e}")
})?;
}

Expand All @@ -420,20 +441,8 @@ where
return Ok(());
};

state.timeouts.increase_timeout(timeout.step);

if matches!(timeout.step, TimeoutStep::Prevote | TimeoutStep::Precommit) {
warn!(step = ?timeout.step, "Timeout elapsed");

state.consensus.print_state();
}

let result = self
.process_input(&myself, state, ConsensusInput::TimeoutElapsed(timeout))
.await;

if let Err(e) = result {
error!("Error when processing TimeoutElapsed message: {e:?}");
if let Err(e) = self.timeout_elapsed(&myself, state, timeout).await {
error!("Error when processing TimeoutElapsed message: {e}");
}

Ok(())
Expand All @@ -445,7 +454,7 @@ where
.await;

if let Err(e) = result {
error!("Error when processing GossipEvent message: {e:?}");
error!("Error when processing GossipEvent message: {e}");
}

Ok(())
Expand All @@ -456,14 +465,104 @@ where
let status = Status::new(state.consensus.driver.height(), earliest_block_height);

if let Err(e) = reply_to.send(status) {
error!("Error when replying to GetStatus message: {e:?}");
error!("Error when replying to GetStatus message: {e}");
}

Ok(())
}
}
}

async fn timeout_elapsed(
&self,
myself: &ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
timeout: Timeout,
) -> Result<(), ActorProcessingErr> {
state.timeouts.increase_timeout(timeout.step);

if matches!(timeout.step, TimeoutStep::Prevote | TimeoutStep::Precommit) {
warn!(step = ?timeout.step, "Timeout elapsed");
state.consensus.print_state();
}

self.process_input(myself, state, ConsensusInput::TimeoutElapsed(timeout))
.await?;

Ok(())
}

async fn check_and_replay_wal(
&self,
myself: &ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
height: Ctx::Height,
) -> Result<(), ActorProcessingErr> {
let result = ractor::call!(self.wal, WalMsg::StartedHeight, height)?;

match result {
Ok(None) => {
// Nothing to replay
trace!(%height, "No WAL entries to replay");
}
Ok(Some(entries)) => {
info!("Found {} WAL entries to replay", entries.len());

state.phase = Phase::Recovering;

if let Err(e) = self.replay_wal_entries(myself, state, entries).await {
error!(%height, "Failed to replay WAL entries: {e}");
}

state.phase = Phase::Running;
}
Err(e) => {
error!(%height, "Error when notifying WAL of started height: {e}")
}
}

Ok(())
}

async fn replay_wal_entries(
&self,
myself: &ActorRef<Msg<Ctx>>,
state: &mut State<Ctx>,
entries: Vec<WalEntry<Ctx>>,
) -> Result<(), ActorProcessingErr> {
use SignedConsensusMsg::*;

for entry in entries {
match entry {
WalEntry::ConsensusMsg(Vote(vote)) => {
if let Err(e) = self
.process_input(myself, state, ConsensusInput::Vote(vote))
.await
{
error!("Error when replaying Vote: {e}");
}
}

WalEntry::ConsensusMsg(Proposal(proposal)) => {
if let Err(e) = self
.process_input(myself, state, ConsensusInput::Proposal(proposal))
.await
{
error!("Error when replaying Proposal: {e}");
}
}

WalEntry::Timeout(timeout) => {
if let Err(e) = self.timeout_elapsed(myself, state, timeout).await {
error!("Error when replaying TimeoutElapsed: {e}");
}
}
}
}

Ok(())
}

fn get_value(
&self,
myself: &ActorRef<Msg<Ctx>>,
Expand Down Expand Up @@ -522,6 +621,7 @@ where
height: Ctx::Height,
timers: &mut Timers<Ctx>,
timeouts: &mut Timeouts,
phase: Phase,
effect: Effect<Ctx>,
) -> Result<Resume<Ctx>, ActorProcessingErr> {
match effect {
Expand All @@ -548,9 +648,11 @@ where
}

Effect::StartRound(height, round, proposer) => {
// FIXME: Handle error in ok case
if let Err(e) = ractor::call!(self.wal, WalMsg::Sync) {
error!("Failed to flush WAL to disk: {e}");
if phase != Phase::Recovering {
// FIXME: Handle error in ok case
if let Err(e) = ractor::call!(self.wal, WalMsg::Sync) {
error!("Failed to flush WAL to disk: {e}");
}
}

self.host.cast(HostMsg::StartedRound {
Expand Down Expand Up @@ -660,6 +762,10 @@ where
}

Effect::PersistMessage(msg) => {
if phase == Phase::Recovering {
return Ok(Resume::Continue);
}

let result = ractor::call!(self.wal, WalMsg::WriteMsg, msg);

match result {
Expand All @@ -672,6 +778,10 @@ where
}

Effect::PersistTimeout(timeout) => {
if phase == Phase::Recovering {
return Ok(Resume::Continue);
}

let result = ractor::call!(self.wal, WalMsg::WriteTimeout, height, timeout);

match result {
Expand Down Expand Up @@ -710,6 +820,7 @@ where
timeouts: Timeouts::new(self.timeout_config),
consensus: ConsensusState::new(self.ctx.clone(), self.params.clone()),
connected_peers: BTreeSet::new(),
phase: Phase::Unstarted,
})
}

Expand Down
Loading

0 comments on commit 4fe607b

Please sign in to comment.