diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 87af078893..104cf5c120 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -521,11 +521,16 @@ impl BFT { return Ok(()); } - /* Proceeding to commit the leader. */ - info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); + // Commit the leader certificate if the primary is not syncing. + if !IS_SYNCING { + /* Proceeding to commit the leader. */ + info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); + // Commit the leader certificate, and all previous leader certificates since the last committed round. + self.commit_leader_certificate::(leader_certificate).await?; + } + + Ok(()) - // Commit the leader certificate, and all previous leader certificates since the last committed round. - self.commit_leader_certificate::(leader_certificate).await } /// Commits the leader certificate, and all previous leader certificates since the last committed round. @@ -814,6 +819,8 @@ impl BFT { mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup, mut rx_sync_bft, + mut rx_commit_bft, + mut rx_is_recently_committed, } = bft_receiver; // Process the current round from the primary. @@ -855,6 +862,30 @@ impl BFT { callback.send(result).ok(); } }); + + // Process the request to commit the leader certificate. + let self_ = self.clone(); + self.spawn(async move { + while let Some((certificate, callback)) = rx_commit_bft.recv().await { + // Update the DAG with the certificate. + let result = self_.commit_leader_certificate::(certificate).await; + // Send the callback **after** updating the DAG. + // Note: We must await the DAG update before proceeding. + callback.send(result).ok(); + } + }); + + // Process the request to check if the batch certificate was recently committed. + let self_ = self.clone(); + self.spawn(async move { + while let Some(((round, certificate_id), callback)) = rx_is_recently_committed.recv().await { + // Check if the certificate was recently committed. + let is_committed = self_.dag.read().is_recently_committed(round, certificate_id); + // Send the callback **after** updating the DAG. + // Note: We must await the DAG update before proceeding. + callback.send(is_committed).ok(); + } + }); } /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must** diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 0b685b0f85..7d9bb23646 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -29,7 +29,7 @@ use snarkvm::{ narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID}, puzzle::{Solution, SolutionID}, }, - prelude::Result, + prelude::{Field, Result}, }; use indexmap::IndexMap; @@ -66,6 +66,8 @@ pub struct BFTSender { pub tx_primary_certificate: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, pub tx_sync_bft_dag_at_bootup: mpsc::Sender>>, pub tx_sync_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, + pub tx_commit_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, + pub tx_is_recently_committed: mpsc::Sender<((u64, Field), oneshot::Sender)>, } impl BFTSender { @@ -98,6 +100,26 @@ impl BFTSender { // Await the callback to continue. callback_receiver.await? } + + /// Sends the leader certificate to the BFT to commit. + pub async fn send_commit_bft(&self, certificate: BatchCertificate) -> Result<()> { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the certificate to the BFT. + self.tx_commit_bft.send((certificate, callback_sender)).await?; + // Await the callback to continue. + callback_receiver.await? + } + + /// Sends the certificate round and ID to the BFT to receive a callback on whether the certificate was recently committed. + pub async fn send_sync_is_recently_committed(&self, round: u64, certificate_id: Field) -> Result { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the round and certificate ID to the BFT. + self.tx_is_recently_committed.send(((round, certificate_id), callback_sender)).await?; + // Await the callback to continue. + Ok(callback_receiver.await?) + } } #[derive(Debug)] @@ -106,6 +128,8 @@ pub struct BFTReceiver { pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, pub rx_sync_bft_dag_at_bootup: mpsc::Receiver>>, pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, + pub rx_commit_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, + pub rx_is_recently_committed: mpsc::Receiver<((u64, Field), oneshot::Sender)>, } /// Initializes the BFT channels. @@ -114,9 +138,11 @@ pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_commit_bft, rx_commit_bft) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE); - let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft }; - let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft }; + let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_commit_bft, tx_is_recently_committed }; + let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_commit_bft, rx_is_recently_committed }; (sender, receiver) } diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 46cb269a54..6ebf2ed52c 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -484,13 +484,46 @@ impl Sync { continue; } + if let Authority::Quorum(subdag) = block.authority() { + // Retrieve the leader certificate of the subdag. + let leader_certificate = subdag.leader_certificate(); + let leader_round = leader_certificate.round(); + let leader_author = leader_certificate.author(); + let leader_id = leader_certificate.id(); + + // If a BFT sender was provided, commit the leader certificate. + if let Some(bft_sender) = self.bft_sender.get() { + // Send the leader certificate to the BFT. + if let Err(e) = bft_sender.send_commit_bft(leader_certificate.clone()).await { + bail!("Sync - {e}"); + }; + + // Ensure that leader certificate was recently committed in the DAG. + match bft_sender.send_sync_is_recently_committed(leader_round, leader_id).await { + Ok(is_recently_committed) => { + if !is_recently_committed { + bail!( + "Sync - Failed to advance blocks - leader certificate with author {leader_author} from round {leader_round} was not recently committed.", + ); + } + debug!( + "Sync - Leader certificate with author {leader_author} from round {leader_round} was recently committed.", + ); + } + Err(e) => { + bail!("Sync - Failed to check if leader certificate was recently committed - {e}"); + } + }; + } + } + + // Advance the ledger state. let self_ = self.clone(); tokio::task::spawn_blocking(move || { // Check the next block. self_.ledger.check_next_block(&block)?; // Attempt to advance to the next block. self_.ledger.advance_to_next_block(&block)?; - // Sync the height with the block. self_.storage.sync_height_with_block(block.height()); // Sync the round with the block.