From ff23fd66e72b466c58473713ca3e9ed7512f0914 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:38:19 -0400 Subject: [PATCH 1/4] Adds channel for committing leader certificates in bft --- node/bft/src/bft.rs | 1 + node/bft/src/helpers/channels.rs | 17 +++++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 1a0b803f34..9ce92e55a0 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -813,6 +813,7 @@ impl BFT { mut rx_primary_certificate, mut rx_sync_bft_dag_at_bootup, mut rx_sync_bft, + mut rx_commit_bft, } = bft_receiver; // Process the current round from the primary. diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 70c6ad617b..2bcac39b82 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -65,6 +65,7 @@ pub struct BFTSender { pub tx_primary_certificate: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, pub tx_sync_bft_dag_at_bootup: mpsc::Sender>>, pub tx_sync_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, + pub tx_commit_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, } impl BFTSender { @@ -97,6 +98,16 @@ impl BFTSender { // Await the callback to continue. callback_receiver.await? } + + /// Sends the leader certificate to the BFT to commit. + pub async fn send_commit_bft(&self, certificate: BatchCertificate) -> Result<()> { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the certificate to the BFT. + self.tx_commit_bft.send((certificate, callback_sender)).await?; + // Await the callback to continue. + callback_receiver.await? + } } #[derive(Debug)] @@ -105,6 +116,7 @@ pub struct BFTReceiver { pub rx_primary_certificate: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, pub rx_sync_bft_dag_at_bootup: mpsc::Receiver>>, pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, + pub rx_commit_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, } /// Initializes the BFT channels. @@ -113,9 +125,10 @@ pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_primary_certificate, rx_primary_certificate) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_commit_bft, rx_commit_bft) = mpsc::channel(MAX_CHANNEL_SIZE); - let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft }; - let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft }; + let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_commit_bft }; + let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_commit_bft }; (sender, receiver) } From 15a566baf261ded8249758b58e695e6aa0dce2d5 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Tue, 27 Aug 2024 22:56:18 -0400 Subject: [PATCH 2/4] Adds channel for recently committed certificates in bft --- node/bft/src/bft.rs | 1 + node/bft/src/helpers/channels.rs | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 9ce92e55a0..71c09664d2 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -814,6 +814,7 @@ impl BFT { mut rx_sync_bft_dag_at_bootup, mut rx_sync_bft, mut rx_commit_bft, + mut rx_is_recently_committed, } = bft_receiver; // Process the current round from the primary. diff --git a/node/bft/src/helpers/channels.rs b/node/bft/src/helpers/channels.rs index 2bcac39b82..da431a7912 100644 --- a/node/bft/src/helpers/channels.rs +++ b/node/bft/src/helpers/channels.rs @@ -28,7 +28,7 @@ use snarkvm::{ narwhal::{BatchCertificate, Data, Subdag, Transmission, TransmissionID}, puzzle::{Solution, SolutionID}, }, - prelude::Result, + prelude::{Field, Result}, }; use indexmap::IndexMap; @@ -66,6 +66,7 @@ pub struct BFTSender { pub tx_sync_bft_dag_at_bootup: mpsc::Sender>>, pub tx_sync_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, pub tx_commit_bft: mpsc::Sender<(BatchCertificate, oneshot::Sender>)>, + pub tx_is_recently_committed: mpsc::Sender<((u64, Field), oneshot::Sender)>, } impl BFTSender { @@ -108,6 +109,16 @@ impl BFTSender { // Await the callback to continue. callback_receiver.await? } + + /// Sends the certificate round and ID to the BFT to receive a callback on whether the certificate was recently committed. + pub async fn send_sync_is_recently_committed(&self, round: u64, certificate_id: Field) -> Result { + // Initialize a callback sender and receiver. + let (callback_sender, callback_receiver) = oneshot::channel(); + // Send the round and certificate ID to the BFT. + self.tx_is_recently_committed.send(((round, certificate_id), callback_sender)).await?; + // Await the callback to continue. + Ok(callback_receiver.await?) + } } #[derive(Debug)] @@ -117,6 +128,7 @@ pub struct BFTReceiver { pub rx_sync_bft_dag_at_bootup: mpsc::Receiver>>, pub rx_sync_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, pub rx_commit_bft: mpsc::Receiver<(BatchCertificate, oneshot::Sender>)>, + pub rx_is_recently_committed: mpsc::Receiver<((u64, Field), oneshot::Sender)>, } /// Initializes the BFT channels. @@ -126,9 +138,10 @@ pub fn init_bft_channels() -> (BFTSender, BFTReceiver) { let (tx_sync_bft_dag_at_bootup, rx_sync_bft_dag_at_bootup) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_sync_bft, rx_sync_bft) = mpsc::channel(MAX_CHANNEL_SIZE); let (tx_commit_bft, rx_commit_bft) = mpsc::channel(MAX_CHANNEL_SIZE); + let (tx_is_recently_committed, rx_is_recently_committed) = mpsc::channel(MAX_CHANNEL_SIZE); - let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_commit_bft }; - let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_commit_bft }; + let sender = BFTSender { tx_primary_round, tx_primary_certificate, tx_sync_bft_dag_at_bootup, tx_sync_bft, tx_commit_bft, tx_is_recently_committed }; + let receiver = BFTReceiver { rx_primary_round, rx_primary_certificate, rx_sync_bft_dag_at_bootup, rx_sync_bft, rx_commit_bft, rx_is_recently_committed }; (sender, receiver) } From c19aff3b4a617edcc50737bec9a676c9ecb104a1 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:03:58 -0400 Subject: [PATCH 3/4] Adds to bft start handlers --- node/bft/src/bft.rs | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 71c09664d2..2a438f425f 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -856,6 +856,30 @@ impl BFT { callback.send(result).ok(); } }); + + // Process the request to commit the leader certificate. + let self_ = self.clone(); + self.spawn(async move { + while let Some((certificate, callback)) = rx_commit_bft.recv().await { + // Update the DAG with the certificate. + let result = self_.commit_leader_certificate::(certificate).await; + // Send the callback **after** updating the DAG. + // Note: We must await the DAG update before proceeding. + callback.send(result).ok(); + } + }); + + // Process the request to check if the batch certificate was recently committed. + let self_ = self.clone(); + self.spawn(async move { + while let Some(((round, certificate_id), callback)) = rx_is_recently_committed.recv().await { + // Check if the certificate was recently committed. + let is_committed = self_.dag.read().is_recently_committed(round, certificate_id); + // Send the callback **after** updating the DAG. + // Note: We must await the DAG update before proceeding. + callback.send(is_committed).ok(); + } + }); } /// Syncs the BFT DAG with the given batch certificates. These batch certificates **must** From bae2f49e56935b990ddebe72c29870d917260ad2 Mon Sep 17 00:00:00 2001 From: mdelle1 <108158289+mdelle1@users.noreply.github.com> Date: Tue, 27 Aug 2024 23:14:59 -0400 Subject: [PATCH 4/4] Couples replicating DAG state to advancing with sync blocks --- node/bft/src/bft.rs | 13 +++++++++---- node/bft/src/sync/mod.rs | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/node/bft/src/bft.rs b/node/bft/src/bft.rs index 2a438f425f..c133b96e9b 100644 --- a/node/bft/src/bft.rs +++ b/node/bft/src/bft.rs @@ -520,11 +520,16 @@ impl BFT { return Ok(()); } - /* Proceeding to commit the leader. */ - info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); + // Commit the leader certificate if the primary is not syncing. + if !IS_SYNCING { + /* Proceeding to commit the leader. */ + info!("Proceeding to commit round {commit_round} with leader '{}'", fmt_id(leader)); + // Commit the leader certificate, and all previous leader certificates since the last committed round. + self.commit_leader_certificate::(leader_certificate).await?; + } + + Ok(()) - // Commit the leader certificate, and all previous leader certificates since the last committed round. - self.commit_leader_certificate::(leader_certificate).await } /// Commits the leader certificate, and all previous leader certificates since the last committed round. diff --git a/node/bft/src/sync/mod.rs b/node/bft/src/sync/mod.rs index 9da5779538..6a9df577ce 100644 --- a/node/bft/src/sync/mod.rs +++ b/node/bft/src/sync/mod.rs @@ -483,13 +483,46 @@ impl Sync { continue; } + if let Authority::Quorum(subdag) = block.authority() { + // Retrieve the leader certificate of the subdag. + let leader_certificate = subdag.leader_certificate(); + let leader_round = leader_certificate.round(); + let leader_author = leader_certificate.author(); + let leader_id = leader_certificate.id(); + + // If a BFT sender was provided, commit the leader certificate. + if let Some(bft_sender) = self.bft_sender.get() { + // Send the leader certificate to the BFT. + if let Err(e) = bft_sender.send_commit_bft(leader_certificate.clone()).await { + bail!("Sync - {e}"); + }; + + // Ensure that leader certificate was recently committed in the DAG. + match bft_sender.send_sync_is_recently_committed(leader_round, leader_id).await { + Ok(is_recently_committed) => { + if !is_recently_committed { + bail!( + "Sync - Failed to advance blocks - leader certificate with author {leader_author} from round {leader_round} was not recently committed.", + ); + } + debug!( + "Sync - Leader certificate with author {leader_author} from round {leader_round} was recently committed.", + ); + } + Err(e) => { + bail!("Sync - Failed to check if leader certificate was recently committed - {e}"); + } + }; + } + } + + // Advance the ledger state. let self_ = self.clone(); tokio::task::spawn_blocking(move || { // Check the next block. self_.ledger.check_next_block(&block)?; // Attempt to advance to the next block. self_.ledger.advance_to_next_block(&block)?; - // Sync the height with the block. self_.storage.sync_height_with_block(block.height()); // Sync the round with the block.