Skip to content

Commit

Permalink
Clean up nimiq-time crate
Browse files Browse the repository at this point in the history
- Make sure to only export API supported on all platforms.
- Make sure not to leak implementation details.
- All return values are named types now.
  • Loading branch information
hrxi committed Nov 28, 2024
1 parent eca46dc commit f831372
Show file tree
Hide file tree
Showing 5 changed files with 171 additions and 56 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ keywords.workspace = true
workspace = true

[dependencies]
futures = "0.3"
futures = { workspace = true }
gloo-timers = { version = "0.3", features = ["futures"] }
instant = { version = "0.1", features = ["wasm-bindgen"] }
pin-project-lite = "0.2.15"
send_wrapper = { version = "0.6", features = ["futures"] }
tokio = { version = "1.41", features = ["time"] }
tokio-stream = { version = "0.1", features = ["time"] }
78 changes: 48 additions & 30 deletions time/src/gloo.rs
Original file line number Diff line number Diff line change
@@ -1,48 +1,66 @@
use std::{convert::TryInto, future::Future, pin::pin, time::Duration};
use std::{
convert::TryInto,
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::future::{select, Either};
use gloo_timers::future::{IntervalStream, TimeoutFuture};
use instant::Instant;
use pin_project_lite::pin_project;
use send_wrapper::SendWrapper;

pub type Interval = SendWrapper<IntervalStream>;

pub fn interval(period: Duration) -> Interval {
assert!(!period.is_zero());
let millis = period
.as_millis()
.try_into()
.expect("Period as millis must fit in u32");
SendWrapper::new(IntervalStream::new(millis))
SendWrapper::new(IntervalStream::new(millis(period)))
}

pub fn timeout<F: Future>(
duration: Duration,
future: F,
) -> impl Future<Output = Result<F::Output, ()>> {
SendWrapper::new(timeout_inner(duration, future))
pub type Sleep = SendWrapper<TimeoutFuture>;

pub fn sleep(duration: Duration) -> Sleep {
SendWrapper::new(TimeoutFuture::new(millis(duration)))
}

async fn timeout_inner<F: Future>(duration: Duration, future: F) -> Result<F::Output, ()> {
let millis = duration
.as_millis()
.try_into()
.expect("Duration as millis must fit in u32");
let timeout = TimeoutFuture::new(millis);
match select(pin!(future), timeout).await {
Either::Left((res, _)) => Ok(res),
Either::Right(_) => Err(()),
pub fn sleep_until(deadline: Instant) -> Sleep {
sleep(deadline.saturating_duration_since(Instant::now()))
}

pin_project! {
pub struct Timeout<F: Future> {
#[pin]
future: F,
#[pin]
deadline: Sleep,
}
}

pub fn sleep(duration: Duration) -> impl Future<Output = ()> {
let millis = duration
.as_millis()
.try_into()
.expect("Duration as millis must fit in u32");
SendWrapper::new(TimeoutFuture::new(millis))
pub fn timeout<F: Future>(duration: Duration, future: F) -> Timeout<F> {
Timeout {
future: future,
deadline: sleep(duration),
}
}

pub fn sleep_until(deadline: Instant) -> impl Future<Output = ()> {
sleep(deadline.saturating_duration_since(Instant::now()))
impl<F: Future> Future for Timeout<F> {
type Output = Result<F::Output, ()>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<F::Output, ()>> {
let this = self.project();
if let Poll::Ready(result) = this.future.poll(cx) {
return Poll::Ready(Ok(result));
}
if let Poll::Ready(_) = this.deadline.poll(cx) {
return Poll::Ready(Err(()));
}
Poll::Pending
}
}

#[track_caller]
fn millis(duration: Duration) -> u32 {
duration
.as_millis()
.try_into()
.expect("Period in milliseconds must fit into a u32")
}
113 changes: 111 additions & 2 deletions time/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,118 @@
use std::{
error::Error,
fmt,
future::Future,
pin::Pin,
task::{Context, Poll},
time::Duration,
};

use futures::{Stream, StreamExt as _};
pub use instant::Instant;
use pin_project_lite::pin_project;

#[cfg(target_family = "wasm")]
mod gloo;
#[cfg(not(target_family = "wasm"))]
mod tokio;

#[cfg(target_family = "wasm")]
pub use gloo::*;
use gloo as sys;
#[cfg(not(target_family = "wasm"))]
pub use tokio::*;
use tokio as sys;

pub struct Interval {
sys: sys::Interval,
}

// TODO: decide on first tick. right now or after one period?
pub fn interval(period: Duration) -> Interval {
limit_duration(period);
Interval {
sys: sys::interval(period),
}
}

impl Stream for Interval {
type Item = ();
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<()>> {
self.sys
.poll_next_unpin(cx)
.map(|option| option.map(|_| ()))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::MAX, None)
}
}

pin_project! {
pub struct Sleep {
#[pin]
sys: sys::Sleep,
}
}

pub fn sleep(duration: Duration) -> Sleep {
limit_duration(duration);
Sleep {
sys: sys::sleep(duration),
}
}

pub fn sleep_until(deadline: Instant) -> Sleep {
if let Some(duration) = deadline.checked_duration_since(Instant::now()) {
limit_duration(duration);
}
Sleep {
sys: sys::sleep_until(deadline),
}
}

impl Future for Sleep {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
self.project().sys.poll(cx)
}
}

#[derive(Debug)]
pub struct Elapsed(());

impl fmt::Display for Elapsed {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
"timeout has elapsed".fmt(f)
}
}

impl Error for Elapsed {}

pin_project! {
pub struct Timeout<F: Future> {
#[pin]
sys: sys::Timeout<F>,
}
}

pub fn timeout<F: Future>(timeout: Duration, future: F) -> Timeout<F> {
limit_duration(timeout);
Timeout {
sys: sys::timeout(timeout, future),
}
}

impl<F: Future> Future for Timeout<F> {
type Output = Result<F::Output, Elapsed>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<F::Output, Elapsed>> {
self.project().sys.poll(cx).map_err(|_| Elapsed(()))
}
}

#[track_caller]
fn limit_duration(duration: Duration) {
// Limit the period to the maximum allowed by gloo-timers to get consistent
// behaviour across both implementations.
assert!(
duration.as_millis() <= u32::MAX as u128,
"Period in milliseconds must fit into a u32",
);
}
30 changes: 8 additions & 22 deletions time/src/tokio.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,16 @@
use std::{future::Future, time::Duration};
use std::time::Duration;

use instant::Instant;
use tokio::time::{
interval_at, sleep as tokio_sleep, sleep_until as tokio_sleep_until, timeout as tokio_timeout,
Instant as TokioInstant, Sleep, Timeout,
};
use tokio::time as tokio;
pub use tokio_stream::wrappers::IntervalStream as Interval;

pub fn interval(period: Duration) -> Interval {
// Limit the period to the maximum allowed by gloo-timers to get consistent behaviour
// across both implementations.
assert!(
period.as_millis() <= u32::MAX as u128,
"Period as millis must fit in u32"
);
Interval::new(interval_at(TokioInstant::now() + period, period))
}
use self::tokio::interval_at;
pub use self::tokio::{sleep, timeout, Sleep, Timeout};
use crate::Instant;

pub fn timeout<F: Future>(duration: Duration, future: F) -> Timeout<F> {
tokio_timeout(duration, future)
}

pub fn sleep(duration: Duration) -> Sleep {
tokio_sleep(duration)
pub fn interval(period: Duration) -> Interval {
Interval::new(interval_at(tokio::Instant::now() + period, period))
}

pub fn sleep_until(deadline: Instant) -> Sleep {
tokio_sleep_until(TokioInstant::from_std(deadline))
tokio::sleep_until(tokio::Instant::from_std(deadline))
}

0 comments on commit f831372

Please sign in to comment.