diff --git a/consensus/src/consensus/mod.rs b/consensus/src/consensus/mod.rs index 8d4fac8366..a349f5287e 100644 --- a/consensus/src/consensus/mod.rs +++ b/consensus/src/consensus/mod.rs @@ -17,7 +17,10 @@ use nimiq_blockchain_proxy::BlockchainProxy; #[cfg(feature = "full")] use nimiq_blockchain_proxy::BlockchainReadProxy; use nimiq_hash::Blake2bHash; -use nimiq_network_interface::{network::Network, request::request_handler}; +use nimiq_network_interface::{ + network::{DhtMode, Network}, + request::request_handler, +}; use nimiq_time::{interval, Interval}; use nimiq_utils::{spawn, WakerExt}; use nimiq_zkp_component::zkp_component::ZKPComponentProxy; @@ -391,8 +394,7 @@ impl Consensus { if self.is_established() { if self.num_agents() < self.min_peers { warn!("Lost consensus!"); - self.established_flag.swap(false, Ordering::Release); - return Some(ConsensusEvent::Lost); + return Some(self.on_consensus_lost()); } // Check if validity window availability changed. if let (_, Some(event)) = self.check_validity_window() { @@ -408,19 +410,12 @@ impl Consensus { if self.num_agents() >= self.min_peers && self.sync.state_complete() { if self.sync.accepted_block_announcements() >= Self::MIN_BLOCKS_ESTABLISHED { info!("Consensus established, number of accepted announcements satisfied."); - self.established_flag.swap(true, Ordering::Release); // Also stop any other checks. self.head_requests = None; self.head_requests_time = None; - self.zkp_proxy - .request_zkp_from_peers(self.sync.peers(), false); - - let (synced_validity_window, _) = self.check_validity_window(); - return Some(ConsensusEvent::Established { - synced_validity_window, - }); + return Some(self.on_consensus_established()); } else { // The head state check is carried out immediately after we reach the minimum // number of peers and then after certain time intervals until consensus is reached. @@ -430,15 +425,7 @@ impl Consensus { // We would like that 2/3 of our peers have a known state. if head_request.num_known_blocks >= 2 * head_request.num_unknown_blocks { info!("Consensus established, 2/3 of heads known."); - self.established_flag.swap(true, Ordering::Release); - - self.zkp_proxy - .request_zkp_from_peers(self.sync.peers(), false); - - let (synced_validity_window, _) = self.check_validity_window(); - return Some(ConsensusEvent::Established { - synced_validity_window, - }); + return Some(self.on_consensus_established()); } } @@ -450,6 +437,30 @@ impl Consensus { None } + fn on_consensus_established(&mut self) -> ConsensusEvent { + self.established_flag.swap(true, Ordering::Release); + + self.zkp_proxy + .request_zkp_from_peers(self.sync.peers(), false); + + let network = Arc::clone(&self.network); + spawn(async move { network.dht_set_mode(DhtMode::Server).await }); + + let (synced_validity_window, _) = self.check_validity_window(); + ConsensusEvent::Established { + synced_validity_window, + } + } + + fn on_consensus_lost(&mut self) -> ConsensusEvent { + self.established_flag.swap(false, Ordering::Release); + + let network = Arc::clone(&self.network); + spawn(async move { network.dht_set_mode(DhtMode::Client).await }); + + ConsensusEvent::Lost + } + /// Requests heads from connected peers. fn request_heads(&mut self) { // Wait for an ongoing head request to finish. diff --git a/network-interface/src/network.rs b/network-interface/src/network.rs index ce36ac6989..9bb85c2d81 100644 --- a/network-interface/src/network.rs +++ b/network-interface/src/network.rs @@ -90,6 +90,12 @@ pub trait RequestResponse { type Response: Serialize + Deserialize + Sync; } +#[derive(Copy, Clone, Debug, Eq, PartialEq)] +pub enum DhtMode { + Client, + Server, +} + #[async_trait] pub trait Network: Send + Sync + Unpin + 'static { type PeerId: Copy + Debug + Display + Ord + Hash + Send + Sync + Unpin + 'static; @@ -170,6 +176,9 @@ pub trait Network: Send + Sync + Unpin + 'static { where T: Topic + Sync; + /// Sets the current operation mode (client/server) for the DHT protocol + async fn dht_set_mode(&self, mode: DhtMode); + /// Gets a value from the distributed hash table async fn dht_get(&self, k: &K) -> Result, Self::Error> where diff --git a/network-libp2p/src/autonat.rs b/network-libp2p/src/autonat.rs index 646790a5bb..1057f5875d 100644 --- a/network-libp2p/src/autonat.rs +++ b/network-libp2p/src/autonat.rs @@ -26,6 +26,11 @@ pub(crate) struct NatState { } impl NatState { + /// Gets the current NAT status of the local peer + pub fn get_status(&self) -> NatStatus { + self.status + } + /// Adds an address to track its NAT status pub fn add_address(&mut self, address: Multiaddr) { self.address_status.insert(address, NatStatus::Unknown); diff --git a/network-libp2p/src/behaviour.rs b/network-libp2p/src/behaviour.rs index 15e01a31ac..dcba36a24d 100644 --- a/network-libp2p/src/behaviour.rs +++ b/network-libp2p/src/behaviour.rs @@ -61,6 +61,10 @@ impl Behaviour { #[cfg(feature = "kad")] if force_dht_server_mode { dht.set_mode(Some(kad::Mode::Server)); + } else { + // Force the DHT mode to Client initially, as we only want to allow Server mode once + // consensus is established. + dht.set_mode(Some(kad::Mode::Client)); } // Discovery behaviour diff --git a/network-libp2p/src/network.rs b/network-libp2p/src/network.rs index e60e8d471f..522eee9d3c 100644 --- a/network-libp2p/src/network.rs +++ b/network-libp2p/src/network.rs @@ -15,8 +15,8 @@ use libp2p::{ }; use nimiq_network_interface::{ network::{ - CloseReason, MsgAcceptance, Network as NetworkInterface, NetworkEvent, SubscribeEvents, - Topic, + CloseReason, DhtMode, MsgAcceptance, Network as NetworkInterface, NetworkEvent, + SubscribeEvents, Topic, }, peer_info::{PeerInfo, Services}, request::{ @@ -625,6 +625,17 @@ impl NetworkInterface for Network { .expect("Failed to send reported message validation result: receiver hung up"); } + async fn dht_set_mode(&self, mode: DhtMode) { + if let Err(error) = self + .action_tx + .clone() + .send(NetworkAction::DhtSetMode { mode }) + .await + { + error!(?mode, %error, "could not send dht_set_mode action to channel"); + }; + } + async fn dht_get(&self, k: &K) -> Result, Self::Error> where K: AsRef<[u8]> + Send + Sync, diff --git a/network-libp2p/src/network_types.rs b/network-libp2p/src/network_types.rs index 0579b1c8bf..71b59b3bad 100644 --- a/network-libp2p/src/network_types.rs +++ b/network-libp2p/src/network_types.rs @@ -12,7 +12,7 @@ use libp2p::{ }; use nimiq_keys::KeyPair; use nimiq_network_interface::{ - network::{CloseReason, MsgAcceptance, PubsubId, Topic}, + network::{CloseReason, DhtMode, MsgAcceptance, PubsubId, Topic}, peer_info::Services, request::{RequestError, RequestType}, }; @@ -39,6 +39,9 @@ pub(crate) enum NetworkAction { address: Multiaddr, output: oneshot::Sender>, }, + DhtSetMode { + mode: DhtMode, + }, DhtGet { key: Vec, output: oneshot::Sender, NetworkError>>, diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index 2d58c925d7..17ffbf0506 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -31,7 +31,7 @@ use libp2p::{ use libp2p::{dns, tcp, websocket}; use log::Instrument; use nimiq_network_interface::{ - network::{CloseReason, NetworkEvent}, + network::{CloseReason, DhtMode, NetworkEvent}, peer_info::PeerInfo, request::{peek_type, InboundRequestError, OutboundRequestError, RequestError}, }; @@ -170,7 +170,7 @@ pub(crate) async fn swarm_task( }, action = action_rx.recv() => { if let Some(action) = action { - perform_action(action, &mut swarm, &mut task_state); + perform_action(action, &mut swarm, &mut task_state, &events_tx); } else { // `action_rx.next()` will return `None` if all senders (i.e. the `Network` object) are dropped. @@ -510,7 +510,9 @@ fn handle_dht_event(event: kad::Event, event_info: EventInfo) { }, } => handle_dht_inbound_put(source, connection, record, event_info), - kad::Event::ModeChanged { new_mode } => handle_dht_mode_change(new_mode, event_info), + kad::Event::ModeChanged { new_mode } => { + handle_dht_mode_change(new_mode, event_info.state, event_info.events_tx) + } _ => {} } @@ -697,12 +699,16 @@ fn handle_dht_inbound_put( } #[cfg(feature = "kad")] -fn handle_dht_mode_change(new_mode: Mode, event_info: EventInfo) { +fn handle_dht_mode_change( + new_mode: Mode, + state: &mut TaskState, + events_tx: &broadcast::Sender>, +) { debug!(%new_mode, "DHT mode changed"); if new_mode == Mode::Server { - event_info.state.dht_server_mode = true; - if event_info.state.dht_bootstrap_state == DhtBootStrapState::Completed { - let _ = event_info.events_tx.send(NetworkEvent::DhtReady); + state.dht_server_mode = true; + if state.dht_bootstrap_state == DhtBootStrapState::Completed { + let _ = events_tx.send(NetworkEvent::DhtReady); } } } @@ -1025,7 +1031,12 @@ fn handle_request_response_inbound_failure( error!(%request_id, %peer_id, %error, "Inbound request failed"); } -fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut TaskState) { +fn perform_action( + action: NetworkAction, + swarm: &mut NimiqSwarm, + state: &mut TaskState, + events_tx: &broadcast::Sender>, +) { match action { NetworkAction::Dial { peer_id, output } => { let dial_opts = DialOpts::peer_id(peer_id) @@ -1041,6 +1052,27 @@ fn perform_action(action: NetworkAction, swarm: &mut NimiqSwarm, state: &mut Tas let result = swarm.dial(dial_opts).map_err(Into::into); output.send(result).ok(); } + NetworkAction::DhtSetMode { mode } => { + #[cfg(feature = "kad")] + let mode = match mode { + DhtMode::Client => Some(Mode::Client), + DhtMode::Server => { + if state.nat_status.get_status() == NatStatus::Public { + Some(Mode::Server) + } else { + // Set to auto, such that it automatically switches to server mode in case + // the NAT status changes to public later on. + None + } + } + }; + #[cfg(feature = "kad")] + swarm.behaviour_mut().dht.set_mode(mode); + #[cfg(feature = "kad")] + if let Some(mode) = mode { + handle_dht_mode_change(mode, state, events_tx); + } + } NetworkAction::DhtGet { key, output } => { #[cfg(feature = "kad")] let query_id = swarm.behaviour_mut().dht.get_record(key.into()); diff --git a/network-mock/src/network.rs b/network-mock/src/network.rs index aebb148ec3..0918792554 100644 --- a/network-mock/src/network.rs +++ b/network-mock/src/network.rs @@ -10,7 +10,8 @@ use async_trait::async_trait; use futures::{stream::BoxStream, StreamExt}; use nimiq_network_interface::{ network::{ - CloseReason, MsgAcceptance, Network, NetworkEvent, PubsubId, SubscribeEvents, Topic, + CloseReason, DhtMode, MsgAcceptance, Network, NetworkEvent, PubsubId, SubscribeEvents, + Topic, }, peer_info::{PeerInfo, Services}, request::{ @@ -534,6 +535,10 @@ impl Network for MockNetwork { // TODO implement } + async fn dht_set_mode(&self, _mode: DhtMode) { + // TODO implement + } + async fn dht_get(&self, k: &K) -> Result, Self::Error> where K: AsRef<[u8]> + Send + Sync,