From 9f15194c2aeacb7a3a69bfee81110af7a6068071 Mon Sep 17 00:00:00 2001 From: Greg Szabo Date: Fri, 22 Nov 2024 08:19:45 +0100 Subject: [PATCH] initial commit --- code/Cargo.lock | 18 +++ code/Cargo.toml | 2 + code/crates/app-channel/Cargo.toml | 26 ++++ code/crates/app-channel/src/actor.rs | 181 +++++++++++++++++++++++++ code/crates/app-channel/src/channel.rs | 75 ++++++++++ code/crates/app-channel/src/lib.rs | 4 + code/crates/app-channel/src/run.rs | 86 ++++++++++++ code/crates/app-channel/src/spawn.rs | 154 +++++++++++++++++++++ 8 files changed, 546 insertions(+) create mode 100644 code/crates/app-channel/Cargo.toml create mode 100644 code/crates/app-channel/src/actor.rs create mode 100644 code/crates/app-channel/src/channel.rs create mode 100644 code/crates/app-channel/src/lib.rs create mode 100644 code/crates/app-channel/src/run.rs create mode 100644 code/crates/app-channel/src/spawn.rs diff --git a/code/Cargo.lock b/code/Cargo.lock index 831a6535e..2f7f8f5b9 100644 --- a/code/Cargo.lock +++ b/code/Cargo.lock @@ -2457,6 +2457,24 @@ dependencies = [ "tracing", ] +[[package]] +name = "malachite-app-channel" +version = "0.1.0" +dependencies = [ + "bytes", + "libp2p-identity", + "malachite-actors", + "malachite-blocksync", + "malachite-common", + "malachite-config", + "malachite-consensus", + "malachite-gossip-consensus", + "malachite-metrics", + "malachite-node", + "ractor", + "tokio", +] + [[package]] name = "malachite-blocksync" version = "0.1.0" diff --git a/code/Cargo.toml b/code/Cargo.toml index 727753139..b8606f666 100644 --- a/code/Cargo.toml +++ b/code/Cargo.toml @@ -3,6 +3,7 @@ resolver = "2" members = [ "crates/actors", + "crates/app-channel", "crates/blocksync", "crates/cli", "crates/common", @@ -54,6 +55,7 @@ unused_crate_dependencies = "warn" [workspace.dependencies] malachite-actors = { version = "0.1.0", path = "crates/actors" } +malachite-app-channel = { version = "0.1.0", path = "crates/app-channel" } malachite-blocksync = { version = "0.1.0", path = "crates/blocksync" } malachite-cli = { version = "0.1.0", path = "crates/cli" } malachite-common = { version = "0.1.0", path = "crates/common" } diff --git a/code/crates/app-channel/Cargo.toml b/code/crates/app-channel/Cargo.toml new file mode 100644 index 000000000..82e9be5e4 --- /dev/null +++ b/code/crates/app-channel/Cargo.toml @@ -0,0 +1,26 @@ +[package] +name = "malachite-app-channel" +version.workspace = true +edition.workspace = true +repository.workspace = true +license.workspace = true +publish.workspace = true +rust-version.workspace = true + +[dependencies] +bytes.workspace = true +libp2p-identity.workspace = true +ractor.workspace = true +tokio.workspace = true + +malachite-actors.workspace = true +malachite-blocksync.workspace = true +malachite-common.workspace = true +malachite-config.workspace = true +malachite-consensus.workspace = true +malachite-gossip-consensus.workspace = true +malachite-metrics.workspace = true +malachite-node.workspace = true + +[lints] +workspace = true diff --git a/code/crates/app-channel/src/actor.rs b/code/crates/app-channel/src/actor.rs new file mode 100644 index 000000000..e8861f368 --- /dev/null +++ b/code/crates/app-channel/src/actor.rs @@ -0,0 +1,181 @@ +use crate::channel::ChannelMsg; +use malachite_actors::host::HostMsg; +use malachite_common::Context; +use malachite_metrics::Metrics; +use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr}; +use tokio::sync::mpsc::Sender; +use tokio::sync::oneshot::Sender as OneShotSender; + +pub struct Connector +where + Ctx: Context, +{ + sender: Sender>, + // Todo: add some metrics + #[allow(dead_code)] + metrics: Metrics, +} + +impl Connector +where + Ctx: Context, +{ + pub fn new(sender: Sender>, metrics: Metrics) -> Self { + Connector { sender, metrics } + } + + pub async fn spawn( + sender: Sender>, + metrics: Metrics, + ) -> Result>, SpawnErr> + where + Ctx: Context, + { + let (actor_ref, _) = Actor::spawn(None, Self::new(sender, metrics), ()).await?; + Ok(actor_ref) + } +} + +#[async_trait] +impl Actor for Connector +where + Ctx: Context, +{ + type Msg = HostMsg; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + match message { + HostMsg::StartRound { + height, + round, + proposer, + } => { + self.sender + .send(ChannelMsg::StartRound { + height, + round, + proposer, + }) + .await? + } + HostMsg::GetValue { + height, + round, + timeout_duration, + address, + reply_to, + } => { + let reply_to = create_reply_channel(reply_to).await?; + + self.sender + .send(ChannelMsg::GetValue { + height, + round, + timeout_duration, + address, + reply_to, + }) + .await? + } + HostMsg::RestreamValue { + height, + round, + valid_round, + address, + value_id, + } => { + self.sender + .send(ChannelMsg::RestreamValue { + height, + round, + valid_round, + address, + value_id, + }) + .await? + } + HostMsg::GetEarliestBlockHeight { reply_to } => { + let reply_to = create_reply_channel(reply_to).await?; + self.sender + .send(ChannelMsg::GetEarliestBlockHeight { reply_to }) + .await?; + } + HostMsg::ReceivedProposalPart { + from, + part, + reply_to, + } => { + let reply_to = create_reply_channel(reply_to).await?; + self.sender + .send(ChannelMsg::ReceivedProposalPart { + from, + part, + reply_to, + }) + .await?; + } + HostMsg::GetValidatorSet { height, reply_to } => { + let reply_to = create_reply_channel(reply_to).await?; + self.sender + .send(ChannelMsg::GetValidatorSet { height, reply_to }) + .await?; + } + HostMsg::Decide { certificate, .. } => { + self.sender.send(ChannelMsg::Decide { certificate }).await? + } + HostMsg::GetDecidedBlock { height, reply_to } => { + let reply_to = create_reply_channel(reply_to).await?; + self.sender + .send(ChannelMsg::GetDecidedBlock { height, reply_to }) + .await?; + } + HostMsg::ProcessSyncedBlockBytes { + height, + round, + validator_address, + block_bytes, + reply_to, + } => { + let reply_to = create_reply_channel(reply_to).await?; + self.sender + .send(ChannelMsg::ProcessSyncedBlockBytes { + height, + round, + validator_address, + block_bytes, + reply_to, + }) + .await?; + } + }; + Ok(()) + } +} + +async fn create_reply_channel( + reply_to: RpcReplyPort, +) -> Result, ActorProcessingErr> +where + T: Send + 'static, +{ + let (tx, rx) = tokio::sync::oneshot::channel::(); + + tokio::spawn(async move { reply_to.send(rx.await.unwrap()) }).await??; + + Ok(tx) +} diff --git a/code/crates/app-channel/src/channel.rs b/code/crates/app-channel/src/channel.rs new file mode 100644 index 000000000..4fb071e76 --- /dev/null +++ b/code/crates/app-channel/src/channel.rs @@ -0,0 +1,75 @@ +use bytes::Bytes; +use libp2p_identity::PeerId; +use malachite_actors::host::LocallyProposedValue; +use malachite_actors::util::streaming::StreamMessage; +use malachite_blocksync::SyncedBlock; +use malachite_common::{CommitCertificate, Context, Round, ValueId}; +use malachite_consensus::ProposedValue; +use std::time::Duration; +use tokio::sync::oneshot::Sender; + +/// Messages that will be sent on the channel. +pub enum ChannelMsg { + /// Consensus has started a new round. + StartRound { + height: Ctx::Height, + round: Round, + proposer: Ctx::Address, + }, + + /// Request to build a local block/value from Driver + GetValue { + height: Ctx::Height, + round: Round, + timeout_duration: Duration, + address: Ctx::Address, + reply_to: Sender>, + }, + + /// Request to restream an existing block/value from Driver + RestreamValue { + height: Ctx::Height, + round: Round, + valid_round: Round, + address: Ctx::Address, + value_id: ValueId, + }, + + /// Request the earliest block height in the block store + GetEarliestBlockHeight { + reply_to: Sender, + }, + + /// ProposalPart received <-- consensus <-- gossip + ReceivedProposalPart { + from: PeerId, + part: StreamMessage, + reply_to: Sender>, + }, + + /// Get the validator set at a given height + GetValidatorSet { + height: Ctx::Height, + reply_to: Sender, + }, + + // Consensus has decided on a value + Decide { + certificate: CommitCertificate, + }, + + // Retrieve decided block from the block store + GetDecidedBlock { + height: Ctx::Height, + reply_to: Sender>>, + }, + + // Synced block + ProcessSyncedBlockBytes { + height: Ctx::Height, + round: Round, + validator_address: Ctx::Address, + block_bytes: Bytes, + reply_to: Sender>, + }, +} diff --git a/code/crates/app-channel/src/lib.rs b/code/crates/app-channel/src/lib.rs new file mode 100644 index 000000000..300275d87 --- /dev/null +++ b/code/crates/app-channel/src/lib.rs @@ -0,0 +1,4 @@ +pub mod actor; +pub mod channel; +pub mod run; +pub mod spawn; diff --git a/code/crates/app-channel/src/run.rs b/code/crates/app-channel/src/run.rs new file mode 100644 index 000000000..008ea4c54 --- /dev/null +++ b/code/crates/app-channel/src/run.rs @@ -0,0 +1,86 @@ +use crate::channel::ChannelMsg; +use crate::spawn::{ + spawn_block_sync_actor, spawn_consensus_actor, spawn_gossip_consensus_actor, spawn_host_actor, +}; +use malachite_actors::util::codec::NetworkCodec; +use malachite_actors::util::streaming::StreamMessage; +use malachite_common::Context; +use malachite_config::Config as NodeConfig; +use malachite_consensus::SignedConsensusMsg; +use malachite_gossip_consensus::Keypair; +use malachite_metrics::{Metrics, SharedRegistry}; +use malachite_node::Node; +use tokio::sync::mpsc::Receiver; + +// Todo: Remove clippy exception when the function signature is finalized +#[allow(clippy::too_many_arguments)] +pub async fn run( + cfg: NodeConfig, // KEEP IT: metrics registry uses moniker gossip_consensus_actor use p2p settings + start_height: Option, // keep it or set it to default. maybe add it to config? blocksync and consensus actors use it + ctx: Ctx, + _node: N, // we will neeed it to get private/public key, address and eventually KeyPair + codec: Codec, + keypair: Keypair, // Todo: see note in code + address: Ctx::Address, // Todo: remove it when Node was properly implemented + initial_validator_set: Ctx::ValidatorSet, +) -> Result>, String> +where + N: Node, + Ctx: Context, + Codec: NetworkCodec, + Codec: NetworkCodec>, + Codec: NetworkCodec>, + Codec: NetworkCodec>, + Codec: NetworkCodec>, + Codec: NetworkCodec>, +{ + let start_height = start_height.unwrap_or_default(); + + let registry = SharedRegistry::global().with_moniker(cfg.moniker.as_str()); + let metrics = Metrics::register(®istry); + + // The key types are not generic enough to create a gossip_consensus::KeyPair, but the current + // libp2p implementation requires a KeyPair in SwarmBuilder::with_existing_identity. + // We either decide on a specific keytype (ed25519 or ecdsa) or keep asking the user for the + // KeyPair. + // let private_key = node.load_private_key(node.load_private_key_file(&home_dir).unwrap()); + // let public_key = node.generate_public_key(private_key); + // let address: Ctx::Address = node.get_address(public_key); + // let pk_bytes = private_key.inner().to_bytes_be(); + // let secret_key = ecdsa::SecretKey::try_from_bytes(pk_bytes).unwrap(); + // let ecdsa_keypair = ecdsa::Keypair::from(secret_key); + // Keypair::from(ecdsa_keypair) + + // Spawn consensus gossip + let gossip_consensus = spawn_gossip_consensus_actor(&cfg, keypair, ®istry, codec).await; + + // Spawn the host actor + let (connector, rx) = spawn_host_actor(metrics.clone()).await; + + let block_sync = spawn_block_sync_actor( + ctx.clone(), + gossip_consensus.clone(), + connector.clone(), + &cfg.blocksync, + start_height, + ®istry, + ) + .await; + + // Spawn consensus + let _consensus = spawn_consensus_actor( + start_height, + initial_validator_set, + address, + ctx.clone(), + cfg, + gossip_consensus.clone(), + connector.clone(), + block_sync.clone(), + metrics, + None, // tx_decision + ) + .await; + + Ok(rx) +} diff --git a/code/crates/app-channel/src/spawn.rs b/code/crates/app-channel/src/spawn.rs new file mode 100644 index 000000000..761503580 --- /dev/null +++ b/code/crates/app-channel/src/spawn.rs @@ -0,0 +1,154 @@ +use crate::actor::Connector; +use crate::channel::ChannelMsg; +use malachite_actors::block_sync::{BlockSync, BlockSyncRef, Params as BlockSyncParams}; +use malachite_actors::consensus::{Consensus, ConsensusParams, ConsensusRef}; +use malachite_actors::gossip_consensus::{GossipConsensus, GossipConsensusRef}; +use malachite_actors::util::codec::NetworkCodec; +use malachite_actors::util::streaming::StreamMessage; +use malachite_common::{CommitCertificate, Context}; +use malachite_config::{BlockSyncConfig, Config as NodeConfig, PubSubProtocol, TransportProtocol}; +use malachite_consensus::{SignedConsensusMsg, ValuePayload}; +use malachite_gossip_consensus::Keypair; +use malachite_gossip_consensus::{ + Config as GossipConsensusConfig, DiscoveryConfig, GossipSubConfig, +}; +use malachite_metrics::{Metrics, SharedRegistry}; +use std::time::Duration; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Receiver; + +pub async fn spawn_gossip_consensus_actor( + cfg: &NodeConfig, + keypair: Keypair, + registry: &SharedRegistry, + codec: Codec, +) -> GossipConsensusRef +where + Ctx: Context, + Codec: NetworkCodec, + Codec: NetworkCodec>, + Codec: NetworkCodec>, + Codec: NetworkCodec>, + Codec: NetworkCodec>, + Codec: NetworkCodec>, +{ + let config_gossip = GossipConsensusConfig { + listen_addr: cfg.consensus.p2p.listen_addr.clone(), + persistent_peers: cfg.consensus.p2p.persistent_peers.clone(), + discovery: DiscoveryConfig { + enabled: cfg.consensus.p2p.discovery.enabled, + ..Default::default() + }, + idle_connection_timeout: Duration::from_secs(15 * 60), + transport: match cfg.consensus.p2p.transport { + TransportProtocol::Tcp => malachite_gossip_consensus::TransportProtocol::Tcp, + TransportProtocol::Quic => malachite_gossip_consensus::TransportProtocol::Quic, + }, + protocol: match cfg.consensus.p2p.protocol { + PubSubProtocol::GossipSub(config) => { + malachite_gossip_consensus::PubSubProtocol::GossipSub(GossipSubConfig { + mesh_n: config.mesh_n(), + mesh_n_high: config.mesh_n_high(), + mesh_n_low: config.mesh_n_low(), + mesh_outbound_min: config.mesh_outbound_min(), + }) + } + PubSubProtocol::Broadcast => malachite_gossip_consensus::PubSubProtocol::Broadcast, + }, + rpc_max_size: cfg.consensus.p2p.rpc_max_size.as_u64() as usize, + pubsub_max_size: cfg.consensus.p2p.pubsub_max_size.as_u64() as usize, + }; + + GossipConsensus::spawn(keypair, config_gossip, registry.clone(), codec) + .await + .unwrap() +} + +#[allow(clippy::too_many_arguments)] +pub async fn spawn_consensus_actor( + start_height: Ctx::Height, + initial_validator_set: Ctx::ValidatorSet, + address: Ctx::Address, + ctx: Ctx, + cfg: NodeConfig, + gossip_consensus: GossipConsensusRef, + host: malachite_actors::host::HostRef, + block_sync: Option>, + metrics: Metrics, + tx_decision: Option>>, +) -> ConsensusRef +where + Ctx: Context, +{ + let value_payload = match cfg.consensus.value_payload { + malachite_config::ValuePayload::PartsOnly => ValuePayload::PartsOnly, + malachite_config::ValuePayload::ProposalOnly => ValuePayload::ProposalOnly, + malachite_config::ValuePayload::ProposalAndParts => ValuePayload::ProposalAndParts, + }; + + let consensus_params = ConsensusParams { + start_height, + initial_validator_set, + address, + threshold_params: Default::default(), + value_payload, + }; + + Consensus::spawn( + ctx, + consensus_params, + cfg.consensus.timeouts, + gossip_consensus, + host, + block_sync, + metrics, + tx_decision, + ) + .await + .unwrap() +} + +pub async fn spawn_block_sync_actor( + ctx: Ctx, + gossip_consensus: GossipConsensusRef, + host: malachite_actors::host::HostRef, + config: &BlockSyncConfig, + initial_height: Ctx::Height, + registry: &SharedRegistry, +) -> Option> +where + Ctx: Context, +{ + if !config.enabled { + return None; + } + + let params = BlockSyncParams { + status_update_interval: config.status_update_interval, + request_timeout: config.request_timeout, + }; + + let metrics = malachite_blocksync::Metrics::register(registry); + let block_sync = BlockSync::new(ctx, gossip_consensus, host, params, metrics); + let (actor_ref, _) = block_sync.spawn(initial_height).await.unwrap(); + + Some(actor_ref) +} + +#[allow(clippy::too_many_arguments)] +pub async fn spawn_host_actor( + metrics: Metrics, +) -> ( + malachite_actors::host::HostRef, + Receiver>, +) +where + Ctx: Context, +{ + let (tx, rx) = mpsc::channel(1); + + let actor_ref = Connector::spawn(tx, metrics).await.unwrap(); + + (actor_ref, rx) +}