Skip to content

Commit

Permalink
initial commit
Browse files Browse the repository at this point in the history
  • Loading branch information
greg-szabo committed Nov 22, 2024
1 parent e1c8ab2 commit 9f15194
Show file tree
Hide file tree
Showing 8 changed files with 546 additions and 0 deletions.
18 changes: 18 additions & 0 deletions code/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions code/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ resolver = "2"

members = [
"crates/actors",
"crates/app-channel",
"crates/blocksync",
"crates/cli",
"crates/common",
Expand Down Expand Up @@ -54,6 +55,7 @@ unused_crate_dependencies = "warn"

[workspace.dependencies]
malachite-actors = { version = "0.1.0", path = "crates/actors" }
malachite-app-channel = { version = "0.1.0", path = "crates/app-channel" }
malachite-blocksync = { version = "0.1.0", path = "crates/blocksync" }
malachite-cli = { version = "0.1.0", path = "crates/cli" }
malachite-common = { version = "0.1.0", path = "crates/common" }
Expand Down
26 changes: 26 additions & 0 deletions code/crates/app-channel/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
[package]
name = "malachite-app-channel"
version.workspace = true
edition.workspace = true
repository.workspace = true
license.workspace = true
publish.workspace = true
rust-version.workspace = true

[dependencies]
bytes.workspace = true
libp2p-identity.workspace = true
ractor.workspace = true
tokio.workspace = true

malachite-actors.workspace = true
malachite-blocksync.workspace = true
malachite-common.workspace = true
malachite-config.workspace = true
malachite-consensus.workspace = true
malachite-gossip-consensus.workspace = true
malachite-metrics.workspace = true
malachite-node.workspace = true

[lints]
workspace = true
181 changes: 181 additions & 0 deletions code/crates/app-channel/src/actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
use crate::channel::ChannelMsg;
use malachite_actors::host::HostMsg;
use malachite_common::Context;
use malachite_metrics::Metrics;
use ractor::{async_trait, Actor, ActorProcessingErr, ActorRef, RpcReplyPort, SpawnErr};
use tokio::sync::mpsc::Sender;
use tokio::sync::oneshot::Sender as OneShotSender;

pub struct Connector<Ctx>
where
Ctx: Context,
{
sender: Sender<ChannelMsg<Ctx>>,
// Todo: add some metrics
#[allow(dead_code)]
metrics: Metrics,
}

impl<Ctx> Connector<Ctx>
where
Ctx: Context,
{
pub fn new(sender: Sender<ChannelMsg<Ctx>>, metrics: Metrics) -> Self {
Connector { sender, metrics }
}

pub async fn spawn(
sender: Sender<ChannelMsg<Ctx>>,
metrics: Metrics,
) -> Result<ActorRef<HostMsg<Ctx>>, SpawnErr>
where
Ctx: Context,
{
let (actor_ref, _) = Actor::spawn(None, Self::new(sender, metrics), ()).await?;
Ok(actor_ref)
}
}

#[async_trait]
impl<Ctx> Actor for Connector<Ctx>
where
Ctx: Context,
{
type Msg = HostMsg<Ctx>;
type State = ();
type Arguments = ();

async fn pre_start(
&self,
_myself: ActorRef<Self::Msg>,
_args: Self::Arguments,
) -> Result<Self::State, ActorProcessingErr> {
Ok(())
}

async fn handle(
&self,
_myself: ActorRef<Self::Msg>,
message: Self::Msg,
_state: &mut Self::State,
) -> Result<(), ActorProcessingErr> {
match message {
HostMsg::StartRound {
height,
round,
proposer,
} => {
self.sender
.send(ChannelMsg::StartRound {
height,
round,
proposer,
})
.await?
}
HostMsg::GetValue {
height,
round,
timeout_duration,
address,
reply_to,
} => {
let reply_to = create_reply_channel(reply_to).await?;

self.sender
.send(ChannelMsg::GetValue {
height,
round,
timeout_duration,
address,
reply_to,
})
.await?
}
HostMsg::RestreamValue {
height,
round,
valid_round,
address,
value_id,
} => {
self.sender
.send(ChannelMsg::RestreamValue {
height,
round,
valid_round,
address,
value_id,
})
.await?
}
HostMsg::GetEarliestBlockHeight { reply_to } => {
let reply_to = create_reply_channel(reply_to).await?;
self.sender
.send(ChannelMsg::GetEarliestBlockHeight { reply_to })
.await?;
}
HostMsg::ReceivedProposalPart {
from,
part,
reply_to,
} => {
let reply_to = create_reply_channel(reply_to).await?;
self.sender
.send(ChannelMsg::ReceivedProposalPart {
from,
part,
reply_to,
})
.await?;
}
HostMsg::GetValidatorSet { height, reply_to } => {
let reply_to = create_reply_channel(reply_to).await?;
self.sender
.send(ChannelMsg::GetValidatorSet { height, reply_to })
.await?;
}
HostMsg::Decide { certificate, .. } => {
self.sender.send(ChannelMsg::Decide { certificate }).await?
}
HostMsg::GetDecidedBlock { height, reply_to } => {
let reply_to = create_reply_channel(reply_to).await?;
self.sender
.send(ChannelMsg::GetDecidedBlock { height, reply_to })
.await?;
}
HostMsg::ProcessSyncedBlockBytes {
height,
round,
validator_address,
block_bytes,
reply_to,
} => {
let reply_to = create_reply_channel(reply_to).await?;
self.sender
.send(ChannelMsg::ProcessSyncedBlockBytes {
height,
round,
validator_address,
block_bytes,
reply_to,
})
.await?;
}
};
Ok(())
}
}

async fn create_reply_channel<T>(
reply_to: RpcReplyPort<T>,
) -> Result<OneShotSender<T>, ActorProcessingErr>
where
T: Send + 'static,
{
let (tx, rx) = tokio::sync::oneshot::channel::<T>();

tokio::spawn(async move { reply_to.send(rx.await.unwrap()) }).await??;

Ok(tx)
}
75 changes: 75 additions & 0 deletions code/crates/app-channel/src/channel.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
use bytes::Bytes;
use libp2p_identity::PeerId;
use malachite_actors::host::LocallyProposedValue;
use malachite_actors::util::streaming::StreamMessage;
use malachite_blocksync::SyncedBlock;
use malachite_common::{CommitCertificate, Context, Round, ValueId};
use malachite_consensus::ProposedValue;
use std::time::Duration;
use tokio::sync::oneshot::Sender;

/// Messages that will be sent on the channel.
pub enum ChannelMsg<Ctx: Context> {
/// Consensus has started a new round.
StartRound {
height: Ctx::Height,
round: Round,
proposer: Ctx::Address,
},

/// Request to build a local block/value from Driver
GetValue {
height: Ctx::Height,
round: Round,
timeout_duration: Duration,
address: Ctx::Address,
reply_to: Sender<LocallyProposedValue<Ctx>>,
},

/// Request to restream an existing block/value from Driver
RestreamValue {
height: Ctx::Height,
round: Round,
valid_round: Round,
address: Ctx::Address,
value_id: ValueId<Ctx>,
},

/// Request the earliest block height in the block store
GetEarliestBlockHeight {
reply_to: Sender<Ctx::Height>,
},

/// ProposalPart received <-- consensus <-- gossip
ReceivedProposalPart {
from: PeerId,
part: StreamMessage<Ctx::ProposalPart>,
reply_to: Sender<ProposedValue<Ctx>>,
},

/// Get the validator set at a given height
GetValidatorSet {
height: Ctx::Height,
reply_to: Sender<Ctx::ValidatorSet>,
},

// Consensus has decided on a value
Decide {
certificate: CommitCertificate<Ctx>,
},

// Retrieve decided block from the block store
GetDecidedBlock {
height: Ctx::Height,
reply_to: Sender<Option<SyncedBlock<Ctx>>>,
},

// Synced block
ProcessSyncedBlockBytes {
height: Ctx::Height,
round: Round,
validator_address: Ctx::Address,
block_bytes: Bytes,
reply_to: Sender<ProposedValue<Ctx>>,
},
}
4 changes: 4 additions & 0 deletions code/crates/app-channel/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
pub mod actor;
pub mod channel;
pub mod run;
pub mod spawn;
Loading

0 comments on commit 9f15194

Please sign in to comment.