Skip to content

Commit

Permalink
Use nimiq-time everywhere
Browse files Browse the repository at this point in the history
Also make sure we continue to do so.
  • Loading branch information
hrxi committed Nov 28, 2024
1 parent f831372 commit 3d4bd1b
Show file tree
Hide file tree
Showing 25 changed files with 109 additions and 57 deletions.
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ log = { workspace = true }
tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "time", "tracing"] }
tokio-metrics = "0.3"

nimiq-time = { workspace = true }
nimiq-utils = { workspace = true, features = ["spawn"] }

[dependencies.nimiq]
Expand Down
6 changes: 4 additions & 2 deletions client/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::time::Duration;

use futures::StreamExt as _;
use log::info;
use nimiq::prover::prover_main;
pub use nimiq::{
Expand All @@ -13,6 +14,7 @@ pub use nimiq::{
signal_handling::initialize_signal_handler,
},
};
use nimiq_time::interval;
use nimiq_utils::spawn;

async fn main_inner() -> Result<(), Error> {
Expand Down Expand Up @@ -161,9 +163,9 @@ async fn main_inner() -> Result<(), Error> {
}

// Run periodically
let mut interval = tokio::time::interval(Duration::from_secs(statistics_interval));
let mut interval = interval(Duration::from_secs(statistics_interval));
loop {
interval.tick().await;
interval.next().await;

if show_statistics {
match client.network().network_info().await {
Expand Down
36 changes: 36 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ reason = "calling asynchronous code from synchronous code is almost always an er
path = "futures_executor::block_on"
reason = "calling asynchronous code from synchronous code is almost always an error, use `tokio::runtime::Handle::current().block_on` if it's not an error"

[[disallowed-methods]]
path = "gloo_timers::future::sleep"
reason = "use `nimiq_time::sleep` instead, it is also supported in non-WASM environments"

[[disallowed-methods]]
path = "tokio::task::spawn"
reason = "use `nimiq_utils::spawn` instead, it is also supported in WASM environments"
Expand All @@ -18,6 +22,26 @@ reason = "use `nimiq_utils::spawn_local` instead, it is also supported in WASM e
path = "wasm_bindgen_futures::spawn_local"
reason = "use `nimiq_utils::spawn` or `nimq_utils::spawn_local` instead, it is also supported in non-WASM environments"

[[disallowed-methods]]
path = "tokio::time::interval"
reason = "use `nimiq_time::interval` instead, it is also supported in WASM environments"

[[disallowed-methods]]
path = "tokio::time::interval_at"
reason = "use `nimiq_time::interval` instead, it is also supported in WASM environments"

[[disallowed-methods]]
path = "tokio::time::sleep"
reason = "use `nimiq_time::sleep` instead, it is also supported in WASM environments"

[[disallowed-methods]]
path = "tokio::time::sleep_until"
reason = "use `nimiq_time::sleep_until` instead, it is also supported in WASM environments"

[[disallowed-methods]]
path = "tokio::time::timeout"
reason = "use `nimiq_time::timeout` instead, it is also supported in WASM environments"

[[disallowed-types]]
path = "futures_util::stream::FuturesUnordered"
reason = "use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s"
Expand All @@ -29,3 +53,15 @@ reason = "use `nimiq_utils::stream::FuturesOrdered` instead, it does not need ma
[[disallowed-types]]
path = "futures_util::stream::SelectAll"
reason = "use `nimiq_utils::stream::SelectAll` instead, it does not need manual `Waker`s"

[[disallowed-types]]
path = "futures_timer::Delay"
reason = "use `nimiq_time::sleep` instead"

[[disallowed-types]]
path = "gloo_timers::future::IntervalStream"
reason = "use `nimiq_time::interval` instead, it is also supported in non-WASM environments"

[[disallowed-types]]
path = "gloo_timers::future::TimeoutFuture"
reason = "use `nimiq_time::timeout` instead, it is also supported in non-WASM environments"
7 changes: 4 additions & 3 deletions handel/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ mod test {
use futures::FutureExt;
use nimiq_collections::BitSet;
use nimiq_test_log::test;
use nimiq_time::sleep;
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -239,7 +240,7 @@ mod test {
self.0.lock().push((update, node_id));

async move {
nimiq_time::sleep(Duration::from_millis(100)).await;
sleep(Duration::from_millis(100)).await;
Ok(())
}
}
Expand Down Expand Up @@ -296,7 +297,7 @@ mod test {
// Clear the buffer so test starts from scratch
t.lock().clear();
// Needed because the send also sleeps
nimiq_time::sleep(Duration::from_millis(110)).await;
sleep(Duration::from_millis(110)).await;

assert_eq!(0, t.lock().len());
send(&mut sender, 0);
Expand All @@ -320,7 +321,7 @@ mod test {
assert_eq!(10, t.lock().len());

// Wait for the futures to resolve, imitating a delay
nimiq_time::sleep(Duration::from_millis(150)).await;
sleep(Duration::from_millis(150)).await;
// Send some more
send(&mut sender, 9); // Not a Duplicate, this should be accepted
send(&mut sender, 8); // Not a Duplicate, this should be accepted
Expand Down
1 change: 1 addition & 0 deletions lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ nimiq-network-interface = { workspace = true }
nimiq-primitives = { workspace = true, features = ["networks"] }
nimiq-rpc-server = { workspace = true, optional = true }
nimiq-serde = { workspace = true }
nimiq-time = { workspace = true }
nimiq-utils = { workspace = true, features = ["time", "key-store"] }
nimiq-validator = { workspace = true, optional = true, features = [
"trusted_push",
Expand Down
4 changes: 3 additions & 1 deletion lib/src/extras/signal_handling.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::time::Duration;

use nimiq_time::sleep;
use nimiq_utils::spawn;
use signal_hook::{consts::SIGINT, iterator::Signals};
use tokio::time::{sleep, Duration};

pub fn initialize_signal_handler() {
let signals = Signals::new([SIGINT]);
Expand Down
3 changes: 2 additions & 1 deletion metrics-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ workspace = true

[dependencies]
bytes = "1.8"
futures = "0.3"
futures = { workspace = true }
http-body-util = { version = "0.1" }
hyper = { version = "1.5" }
hyper-util = { version = "0.1", features = ["server-auto", "tokio"] }
Expand All @@ -44,6 +44,7 @@ nimiq-mempool = { workspace = true, features = ["metrics"] }
nimiq-network-interface = { workspace = true }
nimiq-network-libp2p = { workspace = true, features = ["metrics"] }
nimiq-primitives = { workspace = true, features = ["coin"], optional = true }
nimiq-time = { workspace = true }
nimiq-utils = { workspace = true, features = ["spawn"] }

[features]
Expand Down
3 changes: 2 additions & 1 deletion metrics-server/src/tokio_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use nimiq_time::interval;
use parking_lot::RwLock;
use prometheus_client::{metrics::gauge::Gauge, registry::Registry};
use tokio_metrics::{RuntimeMetrics, RuntimeMonitor};
Expand Down Expand Up @@ -114,7 +115,7 @@ impl TokioRuntimeMetrics {
runtime_monitor: RuntimeMonitor,
) {
let tokio_rt_metrics = tokio_rt_metrics.clone();
let mut interval = tokio::time::interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS));
let mut interval = interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS));
let mut runtime_intervals = runtime_monitor.intervals();

loop {
Expand Down
6 changes: 4 additions & 2 deletions metrics-server/src/tokio_task.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::{collections::HashMap, sync::Arc, time::Duration};

use futures::StreamExt as _;
use nimiq_time::interval;
use parking_lot::RwLock;
use prometheus_client::{metrics::gauge::Gauge, registry::Registry};
use tokio_metrics::{TaskMetrics, TaskMonitor};
Expand Down Expand Up @@ -83,11 +85,11 @@ impl TokioTaskMetrics {
runtime_monitor: TaskMonitor,
) {
let tokio_task_metrics = tokio_task_metrics.clone();
let mut interval = tokio::time::interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS));
let mut interval = interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS));
let mut runtime_intervals = runtime_monitor.intervals();

loop {
interval.tick().await;
interval.next().await;
if let Some(interval) = runtime_intervals.next() {
tokio_task_metrics
.write()
Expand Down
1 change: 0 additions & 1 deletion network-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ async-trait = "0.1"
base64 = "0.22"
bytes = "1.8"
futures = { workspace = true }
futures-timer = "3.0"
hex = "0.4"
instant = { version = "0.1", features = ["wasm-bindgen"] }
ip_network = "0.4"
Expand Down
8 changes: 4 additions & 4 deletions network-libp2p/src/connection_pool/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use std::{
collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque},
future::Future,
net::IpAddr,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
time::Duration,
};

use futures::{future::BoxFuture, Future, FutureExt, StreamExt};
use instant::Instant;
use futures::{FutureExt, StreamExt};
use ip_network::IpNetwork;
use libp2p::{
core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint},
Expand All @@ -21,7 +21,7 @@ use libp2p::{
Multiaddr, PeerId, TransportError,
};
use nimiq_network_interface::{network::CloseReason, peer_info::Services};
use nimiq_time::{interval, sleep_until, Interval};
use nimiq_time::{interval, sleep_until, Instant, Interval, Sleep};
use nimiq_utils::WakerExt as _;
use parking_lot::RwLock;
use rand::{seq::IteratorRandom, thread_rng};
Expand Down Expand Up @@ -102,7 +102,7 @@ struct ConnectionState<T> {
/// List of subsequent banned peers with their unban deadlines in ascending order.
unban_deadlines: VecDeque<(T, Instant)>,
/// Deadline for first peer that can be unbanned.
unban_timeout: Option<(T, BoxFuture<'static, ()>)>,
unban_timeout: Option<(T, Pin<Box<Sleep>>)>,
/// The time that needs to pass to unban a banned peer.
ban_time: Duration,
/// Set of connection IDs mark as failed.
Expand Down
18 changes: 9 additions & 9 deletions network-libp2p/src/discovery/handler.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use std::{
collections::{HashSet, VecDeque},
future::Future as _,
pin::Pin,
sync::Arc,
task::{Context, Poll, Waker},
time::Duration,
};

use futures::{FutureExt, Sink, SinkExt, StreamExt};
use futures_timer::Delay;
use futures::{Sink, SinkExt, StreamExt};
use instant::Instant;
use libp2p::{
identity::Keypair,
Expand All @@ -24,7 +24,7 @@ use libp2p::{
use nimiq_hash::Blake2bHash;
use nimiq_network_interface::peer_info::Services;
use nimiq_serde::DeserializeError;
use nimiq_time::{interval, Interval};
use nimiq_time::{interval, sleep, Interval, Sleep};
use nimiq_utils::tagged_signing::TaggedKeyPair;
use parking_lot::RwLock;
use rand::{seq::IteratorRandom, thread_rng};
Expand Down Expand Up @@ -144,7 +144,7 @@ pub struct Handler {
state: HandlerState,

/// Future that fires on a state change timeout
state_timeout: Option<Delay>,
state_timeout: Option<Pin<Box<Sleep>>>,

/// Services filter sent to us by this peer.
services_filter: Services,
Expand Down Expand Up @@ -248,7 +248,7 @@ impl Handler {
fn check_initialized(&mut self) {
if self.inbound.is_some() && self.outbound.is_some() {
self.state = HandlerState::SendHandshake;
self.state_timeout = Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT));
self.state_timeout = Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT)));

self.waker
.take()
Expand Down Expand Up @@ -352,7 +352,7 @@ impl ConnectionHandler for Handler {
loop {
// Check if we hit the state transition timeout
if let Some(ref mut state_timeout) = self.state_timeout {
if state_timeout.poll_unpin(cx).is_ready() {
if state_timeout.as_mut().poll(cx).is_ready() {
return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerOutEvent::Error(Error::StateTransitionTimeout { state: self.state }),
));
Expand Down Expand Up @@ -382,7 +382,7 @@ impl ConnectionHandler for Handler {
HandlerState::Init => {
// Request outbound substream
self.state = HandlerState::OpenSubstream;
self.state_timeout = Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT));
self.state_timeout = Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT)));

return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest {
protocol: SubstreamProtocol::new(DiscoveryProtocol, ()),
Expand Down Expand Up @@ -417,7 +417,7 @@ impl ConnectionHandler for Handler {
}

self.state = HandlerState::ReceiveHandshake;
self.state_timeout = Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT));
self.state_timeout = Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT)));
}

HandlerState::ReceiveHandshake => {
Expand Down Expand Up @@ -483,7 +483,7 @@ impl ConnectionHandler for Handler {

self.state = HandlerState::ReceiveHandshakeAck;
self.state_timeout =
Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT));
Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT)));

return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
HandlerOutEvent::ObservedAddress { observed_address },
Expand Down
Loading

0 comments on commit 3d4bd1b

Please sign in to comment.