Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add missing Arc blocking methods #71

Merged
merged 3 commits into from
Nov 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ pin_project_lite::pin_project! {
unsafe impl<T: Send + ?Sized> Send for Lock<'_, T> {}
unsafe impl<T: Sync + ?Sized> Sync for Lock<'_, T> {}

impl<T: ?Sized> fmt::Debug for LockInner<'_, T> {
impl<T: ?Sized> fmt::Debug for Lock<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Lock { .. }")
}
Expand Down
132 changes: 132 additions & 0 deletions src/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,41 @@ impl<T> RwLock<T> {
pub fn read_arc<'a>(self: &'a Arc<Self>) -> ReadArc<'a, T> {
ReadArc::new(self.raw.read(), self)
}

/// Acquires an owned, reference-counted read lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Note that attempts to acquire a read lock will block if there are also concurrent attempts
/// to acquire a write lock.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`read_arc`][`RwLock::read_arc`] method,
/// this method will block the current thread until the read lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::RwLock;
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.read_arc_blocking();
/// assert_eq!(*reader, 1);
///
/// assert!(lock.try_read().is_some());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn read_arc_blocking(self: &Arc<Self>) -> RwLockReadGuardArc<T> {
self.read_arc().wait()
}
}

impl<T: ?Sized> RwLock<T> {
Expand Down Expand Up @@ -347,6 +382,47 @@ impl<T: ?Sized> RwLock<T> {
self.upgradable_read().wait()
}

/// Attempts to acquire an owned, reference-counted read lock
/// with the possiblity to upgrade to a write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// Upgradable read lock reserves the right to be upgraded to a write lock, which means there
/// can be at most one upgradable read lock at a time.
///
/// Note that attempts to acquire an upgradable read lock will block if there are concurrent
/// attempts to acquire another upgradable read lock or a write lock.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`upgradable_read_arc`][`RwLock::upgradable_read_arc`]
/// method, this method will block the current thread until the read lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.upgradable_read_arc_blocking();
/// assert_eq!(*reader, 1);
/// assert_eq!(*lock.try_read().unwrap(), 1);
///
/// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader);
/// *writer = 2;
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn upgradable_read_arc_blocking(self: &Arc<Self>) -> RwLockUpgradableReadGuardArc<T> {
self.upgradable_read_arc().wait()
}

/// Attempts to acquire an owned, reference-counted read lock with the possiblity to
/// upgrade to a write lock.
///
Expand Down Expand Up @@ -545,6 +621,36 @@ impl<T: ?Sized> RwLock<T> {
WriteArc::new(self.raw.write(), self)
}

/// Acquires an owned, reference-counted write lock.
///
/// Returns a guard that releases the lock when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`write_arc`][RwLock::write_arc] method, this method will
/// block the current thread until the write lock is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a lock can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::RwLock;
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let writer = lock.write_arc_blocking();
/// assert!(lock.try_read().is_none());
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn write_arc_blocking(self: &Arc<Self>) -> RwLockWriteGuardArc<T> {
self.write_arc().wait()
}

/// Returns a mutable reference to the inner value.
///
/// Since this call borrows the lock mutably, no actual locking takes place. The mutable borrow
Expand Down Expand Up @@ -1067,6 +1173,32 @@ impl<T: ?Sized> RwLockUpgradableReadGuardArc<T> {
)
}
}

/// Upgrades into a write lock.
///
/// # Blocking
///
/// This function will block the current thread until it is able to acquire the write lock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::{RwLock, RwLockUpgradableReadGuardArc};
///
/// let lock = Arc::new(RwLock::new(1));
///
/// let reader = lock.upgradable_read_arc_blocking();
/// assert_eq!(*reader, 1);
///
/// let mut writer = RwLockUpgradableReadGuardArc::upgrade_blocking(reader);
/// *writer = 2;
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn upgrade_blocking(guard: Self) -> RwLockWriteGuardArc<T> {
RwLockUpgradableReadGuardArc::upgrade(guard).wait()
}
}

/// A guard that releases the write lock when dropped.
Expand Down
57 changes: 47 additions & 10 deletions src/semaphore.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use core::fmt;
use core::future::Future;
use core::pin::Pin;
use core::sync::atomic::{AtomicUsize, Ordering};
use core::task::{Context, Poll};
use core::task::Poll;

use alloc::sync::Arc;

Expand Down Expand Up @@ -174,10 +173,38 @@ impl Semaphore {
/// # });
/// ```
pub fn acquire_arc(self: &Arc<Self>) -> AcquireArc {
AcquireArc {
AcquireArc::_new(AcquireArcInner {
semaphore: self.clone(),
listener: EventListener::new(),
}
})
}

/// Waits for an owned permit for a concurrent operation.
///
/// Returns a guard that releases the permit when dropped.
///
/// # Blocking
///
/// Rather than using asynchronous waiting, like the [`acquire_arc`][Semaphore::acquire_arc] method,
/// this method will block the current thread until the permit is acquired.
///
/// This method should not be used in an asynchronous context. It is intended to be
/// used in a way that a semaphore can be used in both asynchronous and synchronous contexts.
/// Calling this method in an asynchronous context may result in a deadlock.
///
/// # Examples
///
/// ```
/// use std::sync::Arc;
/// use async_lock::Semaphore;
///
/// let s = Arc::new(Semaphore::new(2));
/// let guard = s.acquire_arc_blocking();
/// ```
#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[inline]
pub fn acquire_arc_blocking(self: &Arc<Self>) -> SemaphoreGuardArc {
self.acquire_arc().wait()
}

/// Adds `n` additional permits to the semaphore.
Expand Down Expand Up @@ -223,7 +250,7 @@ pin_project_lite::pin_project! {
}
}

impl fmt::Debug for AcquireInner<'_> {
impl fmt::Debug for Acquire<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("Acquire { .. }")
}
Expand Down Expand Up @@ -255,9 +282,15 @@ impl<'a> EventListenerFuture for AcquireInner<'a> {
}
}

pin_project_lite::pin_project! {
easy_wrapper! {
/// The future returned by [`Semaphore::acquire_arc`].
pub struct AcquireArc {
pub struct AcquireArc(AcquireArcInner => SemaphoreGuardArc);
#[cfg(all(feature = "std", not(target_family = "wasm")))]
pub(crate) wait();
}

pin_project_lite::pin_project! {
struct AcquireArcInner {
// The semaphore being acquired.
semaphore: Arc<Semaphore>,

Expand All @@ -273,10 +306,14 @@ impl fmt::Debug for AcquireArc {
}
}

impl Future for AcquireArc {
impl EventListenerFuture for AcquireArcInner {
type Output = SemaphoreGuardArc;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
fn poll_with_strategy<'x, S: Strategy<'x>>(
self: Pin<&mut Self>,
strategy: &mut S,
cx: &mut S::Context,
) -> Poll<Self::Output> {
let mut this = self.project();

loop {
Expand All @@ -287,7 +324,7 @@ impl Future for AcquireArc {
if !this.listener.is_listening() {
this.listener.as_mut().listen(&this.semaphore.event);
} else {
ready!(this.listener.as_mut().poll(cx));
ready!(strategy.poll(this.listener.as_mut(), cx));
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions tests/mutex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ fn smoke_blocking() {
drop(m.lock_blocking());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let m = Arc::new(Mutex::new(()));
drop(m.lock_arc_blocking());
drop(m.lock_arc_blocking());
}

#[test]
fn try_lock() {
let m = Mutex::new(());
Expand Down
18 changes: 18 additions & 0 deletions tests/rwlock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,27 @@ fn smoke_blocking() {
drop(lock.read_blocking());
drop(lock.write_blocking());
drop((lock.read_blocking(), lock.read_blocking()));
let read = lock.read_blocking();
let upgradabe = lock.upgradable_read_blocking();
drop(read);
drop(RwLockUpgradableReadGuard::upgrade_blocking(upgradabe));
drop(lock.write_blocking());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let lock = Arc::new(RwLock::new(()));
drop(lock.read_arc_blocking());
drop(lock.write_arc_blocking());
drop((lock.read_arc_blocking(), lock.read_arc_blocking()));
let read = lock.read_arc_blocking();
let upgradabe = lock.upgradable_read_arc_blocking();
drop(read);
drop(RwLockUpgradableReadGuardArc::upgrade_blocking(upgradabe));
drop(lock.write_arc_blocking());
}

#[test]
fn try_write() {
future::block_on(async {
Expand Down
11 changes: 11 additions & 0 deletions tests/semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ fn smoke_blocking() {
assert!(s.try_acquire().is_some());
}

#[cfg(all(feature = "std", not(target_family = "wasm")))]
#[test]
fn smoke_arc_blocking() {
let s = Arc::new(Semaphore::new(2));
let g1 = s.acquire_arc_blocking();
let _g2 = s.acquire_arc_blocking();
assert!(s.try_acquire().is_none());
drop(g1);
assert!(s.try_acquire().is_some());
}

#[test]
fn add_permits() {
static COUNTER: AtomicUsize = AtomicUsize::new(0);
Expand Down