diff --git a/Cargo.lock b/Cargo.lock index d10026e837..9d574d2345 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3918,6 +3918,7 @@ version = "1.0.5" dependencies = [ "futures-util", "nimiq-lib", + "nimiq-time", "nimiq-utils", "tokio", "tokio-metrics", @@ -4287,6 +4288,7 @@ dependencies = [ "nimiq-rpc-server", "nimiq-serde", "nimiq-test-log", + "nimiq-time", "nimiq-utils", "nimiq-validator", "nimiq-validator-network", @@ -4423,7 +4425,7 @@ name = "nimiq-metrics-server" version = "1.0.5" dependencies = [ "bytes", - "futures", + "futures-util", "http-body-util", "hyper 1.5.1", "hyper-util", @@ -4435,6 +4437,7 @@ dependencies = [ "nimiq-network-interface", "nimiq-network-libp2p", "nimiq-primitives", + "nimiq-time", "nimiq-utils", "parking_lot", "prometheus-client", @@ -4492,7 +4495,6 @@ dependencies = [ "async-trait", "base64 0.22.1", "bytes", - "futures-timer", "futures-util", "hex", "instant", @@ -4582,6 +4584,7 @@ dependencies = [ "nimiq-primitives", "nimiq-serde", "nimiq-test-log", + "nimiq-time", "nimiq-transaction", "nimiq-utils", "nimiq-vrf", @@ -5159,6 +5162,7 @@ dependencies = [ "nimiq-network-mock", "nimiq-primitives", "nimiq-serde", + "nimiq-time", "nimiq-transaction", "nimiq-transaction-builder", "nimiq-utils", diff --git a/client/Cargo.toml b/client/Cargo.toml index 8497d974bf..c48dfad041 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -26,6 +26,7 @@ log = { workspace = true } tokio = { version = "1.41", features = ["macros", "rt-multi-thread", "time", "tracing"] } tokio-metrics = "0.3" +nimiq-time = { workspace = true } nimiq-utils = { workspace = true, features = ["spawn"] } [dependencies.nimiq] diff --git a/client/src/main.rs b/client/src/main.rs index 570a0a1228..46c55780da 100644 --- a/client/src/main.rs +++ b/client/src/main.rs @@ -1,5 +1,6 @@ use std::time::Duration; +use futures::StreamExt as _; use log::info; use nimiq::prover::prover_main; pub use nimiq::{ @@ -13,6 +14,7 @@ pub use nimiq::{ signal_handling::initialize_signal_handler, }, }; +use nimiq_time::interval; use nimiq_utils::spawn; async fn main_inner() -> Result<(), Error> { @@ -161,9 +163,9 @@ async fn main_inner() -> Result<(), Error> { } // Run periodically - let mut interval = tokio::time::interval(Duration::from_secs(statistics_interval)); + let mut interval = interval(Duration::from_secs(statistics_interval)); loop { - interval.tick().await; + interval.next().await; if show_statistics { match client.network().network_info().await { diff --git a/clippy.toml b/clippy.toml index 395597749c..220e39e7ff 100644 --- a/clippy.toml +++ b/clippy.toml @@ -6,6 +6,10 @@ reason = "calling asynchronous code from synchronous code is almost always an er path = "futures_executor::block_on" reason = "calling asynchronous code from synchronous code is almost always an error, use `tokio::runtime::Handle::current().block_on` if it's not an error" +[[disallowed-methods]] +path = "gloo_timers::future::sleep" +reason = "use `nimiq_time::sleep` instead, it is also supported in non-WASM environments" + [[disallowed-methods]] path = "tokio::task::spawn" reason = "use `nimiq_utils::spawn` instead, it is also supported in WASM environments" @@ -18,6 +22,26 @@ reason = "use `nimiq_utils::spawn_local` instead, it is also supported in WASM e path = "wasm_bindgen_futures::spawn_local" reason = "use `nimiq_utils::spawn` or `nimq_utils::spawn_local` instead, it is also supported in non-WASM environments" +[[disallowed-methods]] +path = "tokio::time::interval" +reason = "use `nimiq_time::interval` instead, it is also supported in WASM environments" + +[[disallowed-methods]] +path = "tokio::time::interval_at" +reason = "use `nimiq_time::interval` instead, it is also supported in WASM environments" + +[[disallowed-methods]] +path = "tokio::time::sleep" +reason = "use `nimiq_time::sleep` instead, it is also supported in WASM environments" + +[[disallowed-methods]] +path = "tokio::time::sleep_until" +reason = "use `nimiq_time::sleep_until` instead, it is also supported in WASM environments" + +[[disallowed-methods]] +path = "tokio::time::timeout" +reason = "use `nimiq_time::timeout` instead, it is also supported in WASM environments" + [[disallowed-types]] path = "futures_util::stream::FuturesUnordered" reason = "use `nimiq_utils::stream::FuturesUnordered` instead, it does not need manual `Waker`s" @@ -29,3 +53,15 @@ reason = "use `nimiq_utils::stream::FuturesOrdered` instead, it does not need ma [[disallowed-types]] path = "futures_util::stream::SelectAll" reason = "use `nimiq_utils::stream::SelectAll` instead, it does not need manual `Waker`s" + +[[disallowed-types]] +path = "futures_timer::Delay" +reason = "use `nimiq_time::sleep` instead" + +[[disallowed-types]] +path = "gloo_timers::future::IntervalStream" +reason = "use `nimiq_time::interval` instead, it is also supported in non-WASM environments" + +[[disallowed-types]] +path = "gloo_timers::future::TimeoutFuture" +reason = "use `nimiq_time::timeout` instead, it is also supported in non-WASM environments" diff --git a/handel/src/network.rs b/handel/src/network.rs index 5042cf9a5c..c7ab72c5b4 100644 --- a/handel/src/network.rs +++ b/handel/src/network.rs @@ -200,6 +200,7 @@ mod test { use futures::FutureExt; use nimiq_collections::BitSet; use nimiq_test_log::test; + use nimiq_time::sleep; use parking_lot::Mutex; use serde::{Deserialize, Serialize}; @@ -239,7 +240,7 @@ mod test { self.0.lock().push((update, node_id)); async move { - nimiq_time::sleep(Duration::from_millis(100)).await; + sleep(Duration::from_millis(100)).await; Ok(()) } } @@ -296,7 +297,7 @@ mod test { // Clear the buffer so test starts from scratch t.lock().clear(); // Needed because the send also sleeps - nimiq_time::sleep(Duration::from_millis(110)).await; + sleep(Duration::from_millis(110)).await; assert_eq!(0, t.lock().len()); send(&mut sender, 0); @@ -320,7 +321,7 @@ mod test { assert_eq!(10, t.lock().len()); // Wait for the futures to resolve, imitating a delay - nimiq_time::sleep(Duration::from_millis(150)).await; + sleep(Duration::from_millis(150)).await; // Send some more send(&mut sender, 9); // Not a Duplicate, this should be accepted send(&mut sender, 8); // Not a Duplicate, this should be accepted diff --git a/lib/Cargo.toml b/lib/Cargo.toml index 62fb4d4ccd..0ae9e94898 100644 --- a/lib/Cargo.toml +++ b/lib/Cargo.toml @@ -80,6 +80,7 @@ nimiq-network-interface = { workspace = true } nimiq-primitives = { workspace = true, features = ["networks"] } nimiq-rpc-server = { workspace = true, optional = true } nimiq-serde = { workspace = true } +nimiq-time = { workspace = true } nimiq-utils = { workspace = true, features = ["time", "key-store"] } nimiq-validator = { workspace = true, optional = true, features = [ "trusted_push", diff --git a/lib/src/extras/signal_handling.rs b/lib/src/extras/signal_handling.rs index e1065c6d29..ed127912f8 100644 --- a/lib/src/extras/signal_handling.rs +++ b/lib/src/extras/signal_handling.rs @@ -1,6 +1,8 @@ +use std::time::Duration; + +use nimiq_time::sleep; use nimiq_utils::spawn; use signal_hook::{consts::SIGINT, iterator::Signals}; -use tokio::time::{sleep, Duration}; pub fn initialize_signal_handler() { let signals = Signals::new([SIGINT]); diff --git a/metrics-server/Cargo.toml b/metrics-server/Cargo.toml index f3dc1b45a7..fe5a1d63b3 100644 --- a/metrics-server/Cargo.toml +++ b/metrics-server/Cargo.toml @@ -21,7 +21,7 @@ workspace = true [dependencies] bytes = "1.8" -futures = "0.3" +futures = { workspace = true } http-body-util = { version = "0.1" } hyper = { version = "1.5" } hyper-util = { version = "0.1", features = ["server-auto", "tokio"] } @@ -44,6 +44,7 @@ nimiq-mempool = { workspace = true, features = ["metrics"] } nimiq-network-interface = { workspace = true } nimiq-network-libp2p = { workspace = true, features = ["metrics"] } nimiq-primitives = { workspace = true, features = ["coin"], optional = true } +nimiq-time = { workspace = true } nimiq-utils = { workspace = true, features = ["spawn"] } [features] diff --git a/metrics-server/src/tokio_runtime.rs b/metrics-server/src/tokio_runtime.rs index 9276eef1f4..ed731070e9 100644 --- a/metrics-server/src/tokio_runtime.rs +++ b/metrics-server/src/tokio_runtime.rs @@ -1,5 +1,6 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use nimiq_time::interval; use parking_lot::RwLock; use prometheus_client::{metrics::gauge::Gauge, registry::Registry}; use tokio_metrics::{RuntimeMetrics, RuntimeMonitor}; @@ -114,7 +115,7 @@ impl TokioRuntimeMetrics { runtime_monitor: RuntimeMonitor, ) { let tokio_rt_metrics = tokio_rt_metrics.clone(); - let mut interval = tokio::time::interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS)); + let mut interval = interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS)); let mut runtime_intervals = runtime_monitor.intervals(); loop { diff --git a/metrics-server/src/tokio_task.rs b/metrics-server/src/tokio_task.rs index 2d36af2d7e..2078dca6fe 100644 --- a/metrics-server/src/tokio_task.rs +++ b/metrics-server/src/tokio_task.rs @@ -1,5 +1,7 @@ use std::{collections::HashMap, sync::Arc, time::Duration}; +use futures::StreamExt as _; +use nimiq_time::interval; use parking_lot::RwLock; use prometheus_client::{metrics::gauge::Gauge, registry::Registry}; use tokio_metrics::{TaskMetrics, TaskMonitor}; @@ -83,11 +85,11 @@ impl TokioTaskMetrics { runtime_monitor: TaskMonitor, ) { let tokio_task_metrics = tokio_task_metrics.clone(); - let mut interval = tokio::time::interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS)); + let mut interval = interval(Duration::from_secs(TOKIO_METRICS_FREQ_SECS)); let mut runtime_intervals = runtime_monitor.intervals(); loop { - interval.tick().await; + interval.next().await; if let Some(interval) = runtime_intervals.next() { tokio_task_metrics .write() diff --git a/network-libp2p/Cargo.toml b/network-libp2p/Cargo.toml index 2e91704b09..fdb476c985 100644 --- a/network-libp2p/Cargo.toml +++ b/network-libp2p/Cargo.toml @@ -24,7 +24,6 @@ async-trait = "0.1" base64 = "0.22" bytes = "1.8" futures = { workspace = true } -futures-timer = "3.0" hex = "0.4" instant = { version = "0.1", features = ["wasm-bindgen"] } ip_network = "0.4" diff --git a/network-libp2p/src/connection_pool/behaviour.rs b/network-libp2p/src/connection_pool/behaviour.rs index fef1be89b7..757bde0415 100644 --- a/network-libp2p/src/connection_pool/behaviour.rs +++ b/network-libp2p/src/connection_pool/behaviour.rs @@ -1,5 +1,6 @@ use std::{ collections::{hash_map::Entry, BTreeMap, BTreeSet, HashMap, HashSet, VecDeque}, + future::Future, net::IpAddr, pin::Pin, sync::Arc, @@ -7,8 +8,7 @@ use std::{ time::Duration, }; -use futures::{future::BoxFuture, Future, FutureExt, StreamExt}; -use instant::Instant; +use futures::{FutureExt, StreamExt}; use ip_network::IpNetwork; use libp2p::{ core::{multiaddr::Protocol, transport::PortUse, ConnectedPoint, Endpoint}, @@ -21,7 +21,7 @@ use libp2p::{ Multiaddr, PeerId, TransportError, }; use nimiq_network_interface::{network::CloseReason, peer_info::Services}; -use nimiq_time::{interval, sleep_until, Interval}; +use nimiq_time::{interval, sleep_until, Instant, Interval, Sleep}; use nimiq_utils::WakerExt as _; use parking_lot::RwLock; use rand::{seq::IteratorRandom, thread_rng}; @@ -102,7 +102,7 @@ struct ConnectionState { /// List of subsequent banned peers with their unban deadlines in ascending order. unban_deadlines: VecDeque<(T, Instant)>, /// Deadline for first peer that can be unbanned. - unban_timeout: Option<(T, BoxFuture<'static, ()>)>, + unban_timeout: Option<(T, Pin>)>, /// The time that needs to pass to unban a banned peer. ban_time: Duration, /// Set of connection IDs mark as failed. diff --git a/network-libp2p/src/discovery/handler.rs b/network-libp2p/src/discovery/handler.rs index 7a04d03c58..ca2844d074 100644 --- a/network-libp2p/src/discovery/handler.rs +++ b/network-libp2p/src/discovery/handler.rs @@ -1,13 +1,13 @@ use std::{ collections::{HashSet, VecDeque}, + future::Future as _, pin::Pin, sync::Arc, task::{Context, Poll, Waker}, time::Duration, }; -use futures::{FutureExt, Sink, SinkExt, StreamExt}; -use futures_timer::Delay; +use futures::{Sink, SinkExt, StreamExt}; use instant::Instant; use libp2p::{ identity::Keypair, @@ -24,7 +24,7 @@ use libp2p::{ use nimiq_hash::Blake2bHash; use nimiq_network_interface::peer_info::Services; use nimiq_serde::DeserializeError; -use nimiq_time::{interval, Interval}; +use nimiq_time::{interval, sleep, Interval, Sleep}; use nimiq_utils::tagged_signing::TaggedKeyPair; use parking_lot::RwLock; use rand::{seq::IteratorRandom, thread_rng}; @@ -144,7 +144,7 @@ pub struct Handler { state: HandlerState, /// Future that fires on a state change timeout - state_timeout: Option, + state_timeout: Option>>, /// Services filter sent to us by this peer. services_filter: Services, @@ -248,7 +248,7 @@ impl Handler { fn check_initialized(&mut self) { if self.inbound.is_some() && self.outbound.is_some() { self.state = HandlerState::SendHandshake; - self.state_timeout = Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT)); + self.state_timeout = Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT))); self.waker .take() @@ -352,7 +352,7 @@ impl ConnectionHandler for Handler { loop { // Check if we hit the state transition timeout if let Some(ref mut state_timeout) = self.state_timeout { - if state_timeout.poll_unpin(cx).is_ready() { + if state_timeout.as_mut().poll(cx).is_ready() { return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( HandlerOutEvent::Error(Error::StateTransitionTimeout { state: self.state }), )); @@ -382,7 +382,7 @@ impl ConnectionHandler for Handler { HandlerState::Init => { // Request outbound substream self.state = HandlerState::OpenSubstream; - self.state_timeout = Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT)); + self.state_timeout = Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT))); return Poll::Ready(ConnectionHandlerEvent::OutboundSubstreamRequest { protocol: SubstreamProtocol::new(DiscoveryProtocol, ()), @@ -417,7 +417,7 @@ impl ConnectionHandler for Handler { } self.state = HandlerState::ReceiveHandshake; - self.state_timeout = Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT)); + self.state_timeout = Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT))); } HandlerState::ReceiveHandshake => { @@ -483,7 +483,7 @@ impl ConnectionHandler for Handler { self.state = HandlerState::ReceiveHandshakeAck; self.state_timeout = - Some(Delay::new(Self::STATE_TRANSITION_TIMEOUT)); + Some(Box::pin(sleep(Self::STATE_TRANSITION_TIMEOUT))); return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour( HandlerOutEvent::ObservedAddress { observed_address }, diff --git a/network-libp2p/src/swarm.rs b/network-libp2p/src/swarm.rs index db6e216260..cb1646b627 100644 --- a/network-libp2p/src/swarm.rs +++ b/network-libp2p/src/swarm.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, num::NonZeroU8, sync::Arc}; +use std::{collections::HashMap, num::NonZeroU8, sync::Arc, time::Duration}; use futures::StreamExt; #[cfg(feature = "metrics")] @@ -231,14 +231,14 @@ fn new_transport( .upgrade(core::upgrade::Version::V1) .authenticate(noise::Config::new(keypair).unwrap()) .multiplex(yamux) - .timeout(std::time::Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) .boxed()) } else { Ok(transport .upgrade(core::upgrade::Version::V1) .authenticate(noise::Config::new(keypair).unwrap()) .multiplex(yamux) - .timeout(std::time::Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) .boxed()) } } else { @@ -271,14 +271,14 @@ fn new_transport( .upgrade(core::upgrade::Version::V1) .authenticate(noise::Config::new(keypair).unwrap()) .multiplex(yamux) - .timeout(std::time::Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) .boxed()) } else { Ok(transport .upgrade(core::upgrade::Version::V1) .authenticate(noise::Config::new(keypair).unwrap()) .multiplex(yamux) - .timeout(std::time::Duration::from_secs(20)) + .timeout(Duration::from_secs(20)) .boxed()) } } diff --git a/pow-migration/Cargo.toml b/pow-migration/Cargo.toml index 7dba1d96d4..1876c7e57e 100644 --- a/pow-migration/Cargo.toml +++ b/pow-migration/Cargo.toml @@ -48,6 +48,7 @@ nimiq = { workspace = true, features = [ ] } nimiq-primitives = { workspace = true, features = ["policy"] } nimiq-serde = { workspace = true } +nimiq-time = { workspace = true } nimiq-transaction = { workspace = true } nimiq-utils = { workspace = true, features = ["spawn"] } nimiq-vrf = { workspace = true } diff --git a/pow-migration/src/history.rs b/pow-migration/src/history.rs index 925dac42a7..bb59927d96 100644 --- a/pow-migration/src/history.rs +++ b/pow-migration/src/history.rs @@ -16,13 +16,11 @@ use nimiq_rpc::{ }, Client, }; +use nimiq_time::sleep; use nimiq_transaction::{ historic_transaction::HistoricTransaction, ExecutedTransaction, Transaction, TransactionFlags, }; -use tokio::{ - sync::{mpsc, watch}, - time::sleep, -}; +use tokio::sync::{mpsc, watch}; use crate::{async_retryer, types::HistoryError}; diff --git a/pow-migration/src/lib.rs b/pow-migration/src/lib.rs index bcb0a59877..9395caa98a 100644 --- a/pow-migration/src/lib.rs +++ b/pow-migration/src/lib.rs @@ -20,7 +20,7 @@ use nimiq_keys::Address; use nimiq_primitives::networks::NetworkId; use nimiq_rpc::Client; use nimiq_serde::Serialize; -use tokio::time::sleep; +use nimiq_time::sleep; use types::GenesisValidator; use crate::{ diff --git a/pow-migration/src/main.rs b/pow-migration/src/main.rs index 8086abf4e4..909ee5cfa1 100644 --- a/pow-migration/src/main.rs +++ b/pow-migration/src/main.rs @@ -21,11 +21,9 @@ use nimiq_pow_migration::{ }; use nimiq_primitives::networks::NetworkId; use nimiq_rpc::Client; +use nimiq_time::sleep; use nimiq_utils::spawn; -use tokio::{ - sync::{mpsc, watch}, - time::sleep, -}; +use tokio::sync::{mpsc, watch}; use url::Url; /// We check for online transactions every this amount of blocks diff --git a/tendermint/src/tendermint.rs b/tendermint/src/tendermint.rs index b9bcf45c6c..d8522dcbaf 100644 --- a/tendermint/src/tendermint.rs +++ b/tendermint/src/tendermint.rs @@ -12,7 +12,7 @@ use futures::{ stream::{BoxStream, Stream, StreamExt}, }; use nimiq_collections::BitSet; -use nimiq_time::sleep; +use nimiq_time::{sleep, Sleep}; use nimiq_utils::stream::{FuturesUnordered, SelectAll}; use rand::{thread_rng, Rng}; use tokio::{sync::mpsc, time::Duration}; @@ -74,7 +74,7 @@ pub struct Tendermint { /// In case a timeout is required it will be stored here until elapsed or no longer necessary. /// Must be cleared in both cases. - pub(crate) timeout: Option>, + pub(crate) timeout: Option>>, /// Keeps track of a state return that still needs to happen. Whenever a proposal is Rejected/Ignored/Accepted /// it will be returned as a stream item but the state change following it must be returned on the next call @@ -169,7 +169,7 @@ impl Tendermint { TProtocol::TIMEOUT_INIT + self.state.current_round as u64 * TProtocol::TIMEOUT_DELTA, ); - self.timeout = Some(sleep(duration).boxed()); + self.timeout = Some(Box::pin(sleep(duration))); } } diff --git a/tendermint/tests/common/helper.rs b/tendermint/tests/common/helper.rs index f20d361f4c..d9dc5763f3 100644 --- a/tendermint/tests/common/helper.rs +++ b/tendermint/tests/common/helper.rs @@ -4,6 +4,7 @@ use std::{ time::Duration, }; +use nimiq_time::timeout; use tokio::sync::mpsc; use super::*; @@ -135,7 +136,7 @@ pub fn expect_state(tm: &mut Tendermint) -> Stat pub async fn await_state(tm: &mut Tendermint) -> State { // Only wait for a timeout, otherwise this could diverge - match nimiq_time::timeout(Duration::from_millis(1000), tm.next()).await { + match timeout(Duration::from_millis(1000), tm.next()).await { Ok(Some(Return::Update(u))) => u, Ok(Some(_other_returns)) => panic!("Tendermint should have returned a state."), Ok(None) => panic!("Tendermint should not have terminated."), diff --git a/time/src/gloo.rs b/time/src/gloo.rs index 5a74f2e770..2d16c757f9 100644 --- a/time/src/gloo.rs +++ b/time/src/gloo.rs @@ -20,6 +20,7 @@ pub fn interval(period: Duration) -> Interval { pub type Sleep = SendWrapper; pub fn sleep(duration: Duration) -> Sleep { + #[allow(clippy::disallowed_types)] SendWrapper::new(TimeoutFuture::new(millis(duration))) } diff --git a/time/src/lib.rs b/time/src/lib.rs index 9d7d9380db..2d43cf484a 100644 --- a/time/src/lib.rs +++ b/time/src/lib.rs @@ -28,6 +28,7 @@ pub struct Interval { // TODO: decide on first tick. right now or after one period? pub fn interval(period: Duration) -> Interval { limit_duration(period); + #[allow(clippy::disallowed_methods)] Interval { sys: sys::interval(period), } @@ -54,6 +55,7 @@ pin_project! { pub fn sleep(duration: Duration) -> Sleep { limit_duration(duration); + #[allow(clippy::disallowed_methods)] Sleep { sys: sys::sleep(duration), } @@ -95,6 +97,7 @@ pin_project! { pub fn timeout(timeout: Duration, future: F) -> Timeout { limit_duration(timeout); + #[allow(clippy::disallowed_methods)] Timeout { sys: sys::timeout(timeout, future), } diff --git a/time/src/tokio.rs b/time/src/tokio.rs index 447b5f626d..b359326ca7 100644 --- a/time/src/tokio.rs +++ b/time/src/tokio.rs @@ -8,9 +8,11 @@ pub use self::tokio::{sleep, timeout, Sleep, Timeout}; use crate::Instant; pub fn interval(period: Duration) -> Interval { + #[allow(clippy::disallowed_methods)] Interval::new(interval_at(tokio::Instant::now() + period, period)) } pub fn sleep_until(deadline: Instant) -> Sleep { + #[allow(clippy::disallowed_methods)] tokio::sleep_until(tokio::Instant::from_std(deadline)) } diff --git a/web-client/Cargo.toml b/web-client/Cargo.toml index 76ab5fb7d6..9f307f8522 100644 --- a/web-client/Cargo.toml +++ b/web-client/Cargo.toml @@ -48,6 +48,7 @@ nimiq-keys = { workspace = true } nimiq-network-interface = { workspace = true } nimiq-primitives = { workspace = true, features = ["coin", "networks", "ts-types"] } nimiq-serde = { workspace = true } +nimiq-time = { workspace = true } nimiq-transaction = { workspace = true, features = ["ts-types"] } nimiq-transaction-builder = { workspace = true } nimiq-utils = { workspace = true, features = ["merkle", "otp"] } diff --git a/web-client/src/client/lib.rs b/web-client/src/client/lib.rs index 1a84db30f0..483c070540 100644 --- a/web-client/src/client/lib.rs +++ b/web-client/src/client/lib.rs @@ -6,12 +6,10 @@ use std::{ }, rc::Rc, str::FromStr, + time::Duration, }; -use futures::{ - future::{select, Either}, - StreamExt, -}; +use futures::StreamExt; use js_sys::{global, Array, Function, JsString}; use log::level_filters::LevelFilter; use nimiq::client::ConsensusProxy; @@ -30,6 +28,7 @@ use nimiq_network_interface::{ Multiaddr, }; use nimiq_primitives::policy::Policy; +use nimiq_time::timeout; use nimiq_utils::spawn_local; use tokio::sync::oneshot; use tsify::Tsify; @@ -609,18 +608,16 @@ impl Client { // Actually send the transaction consensus.send_transaction(tx.native()).await?; - let timeout = gloo_timers::future::TimeoutFuture::new(10_000); - - // Wait for the transaction (will be None if the timeout is reached first) - let res = select(receiver, timeout).await; + // Wait for the transaction (will be Err(_) if the timeout is reached first) + let res = timeout(Duration::from_millis(10_000), receiver).await; - let maybe_details = if let Either::Left((res, _)) = res { - res.ok() - } else { + if res.is_err() { // If the timeout triggered, delete our oneshot sender self.transaction_oneshots.borrow_mut().remove(hash); - None - }; + } + + // Throw away the error. + let maybe_details = res.ok().and_then(|res| res.ok()); drop(address_subscription);