diff --git a/runtime/src/installed_scheduler_pool.rs b/runtime/src/installed_scheduler_pool.rs index cf5e65b182f588..e84677e6007fff 100644 --- a/runtime/src/installed_scheduler_pool.rs +++ b/runtime/src/installed_scheduler_pool.rs @@ -292,6 +292,8 @@ impl WaitReason { pub enum SchedulerStatus { /// Unified scheduler is disabled or installed scheduler is consumed by wait_for_termination(). /// Note that transition to Unavailable from {Active, Stale} is one-way (i.e. one-time). + /// Also, this variant is transiently used as a placeholder internally when transitioning + /// scheduler statuses, which isn't observable unless panic is happening. Unavailable, /// Scheduler is installed into a bank; could be running or just be idling. /// This will be transitioned to Stale after certain time has passed if its bank hasn't been @@ -329,7 +331,7 @@ impl SchedulerStatus { return; } let Self::Active(scheduler) = mem::replace(self, Self::Unavailable) else { - unreachable!("not active: {:?}", self); + unreachable!("not active: {self:?}"); }; let (pool, result_with_timings) = f(scheduler); *self = Self::Stale(pool, result_with_timings); @@ -549,7 +551,8 @@ impl BankWithSchedulerInner { let scheduler = self.scheduler.read().unwrap(); // Re-register a new timeout listener only after acquiring the read lock; // Otherwise, the listener would again put scheduler into Stale before the read - // lock under an extremely-rare race condition, causing panic below. + // lock under an extremely-rare race condition, causing panic below in + // active_scheduler(). pool.register_timeout_listener(self.do_create_timeout_listener()); f(scheduler.active_scheduler()) } diff --git a/unified-scheduler-pool/src/lib.rs b/unified-scheduler-pool/src/lib.rs index c69df9c606bc64..ed7f1407a3c37f 100644 --- a/unified-scheduler-pool/src/lib.rs +++ b/unified-scheduler-pool/src/lib.rs @@ -340,6 +340,8 @@ where context: SchedulingContext, result_with_timings: ResultWithTimings, ) -> S { + assert_matches!(result_with_timings, (Ok(_), _)); + // pop is intentional for filo, expecting relatively warmed-up scheduler due to having been // returned recently if let Some((inner, _pooled_at)) = self.scheduler_inners.lock().expect("not poisoned").pop() @@ -1711,6 +1713,10 @@ mod tests { &CheckPoint::TimeoutListenerTriggered(0), &CheckPoint::TimeoutListenerTriggered(1), &TestCheckPoint::AfterTimeoutListenerTriggered, + &TestCheckPoint::BeforeTimeoutListenerTriggered, + &CheckPoint::TimeoutListenerTriggered(0), + &CheckPoint::TimeoutListenerTriggered(1), + &TestCheckPoint::AfterTimeoutListenerTriggered, ]); let ignored_prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64)); @@ -1778,6 +1784,11 @@ mod tests { bank.schedule_transaction_executions([(tx_after_stale, &1)].into_iter()) .unwrap(); + // Observe second occurrence of TimeoutListenerTriggered(1), which indicates a new timeout + // lister is registered correctly again for reactivated scheduler. + sleepless_testing::at(TestCheckPoint::BeforeTimeoutListenerTriggered); + sleepless_testing::at(TestCheckPoint::AfterTimeoutListenerTriggered); + let (result, timings) = bank.wait_for_completed_scheduler().unwrap(); assert_matches!(result, Ok(())); // ResultWithTimings should be carried over across active=>stale=>active transitions. diff --git a/unified-scheduler-pool/src/sleepless_testing.rs b/unified-scheduler-pool/src/sleepless_testing.rs index 9c2213f657e86a..901a2f7c4fded7 100644 --- a/unified-scheduler-pool/src/sleepless_testing.rs +++ b/unified-scheduler-pool/src/sleepless_testing.rs @@ -26,20 +26,21 @@ pub(crate) trait BuilderTracked: Sized { } #[cfg(not(test))] -pub(crate) use sleepless_testing_dummy::*; +pub(crate) use dummy::*; #[cfg(test)] -pub(crate) use sleepless_testing_real::*; +pub(crate) use real::*; #[cfg(test)] -mod sleepless_testing_real { +mod real { use { lazy_static::lazy_static, + log::trace, std::{ cmp::Ordering::{Equal, Greater, Less}, - collections::{HashMap, HashSet}, + collections::HashMap, fmt::Debug, sync::{Arc, Condvar, Mutex}, - thread::{current, JoinHandle, ThreadId}, + thread::{current, panicking, JoinHandle, ThreadId}, }, }; @@ -47,7 +48,7 @@ mod sleepless_testing_real { struct Progress { _name: String, check_points: Vec, - current_check_point: Mutex, + current_index: Mutex, condvar: Condvar, } @@ -61,61 +62,88 @@ mod sleepless_testing_real { .into_iter() .chain(check_points) .collect::>(); - let check_points_set = check_points.iter().collect::>(); - assert_eq!(check_points.len(), check_points_set.len()); - Self { _name: name, check_points, - current_check_point: Mutex::new(initial_check_point), + current_index: Mutex::new(0), condvar: Condvar::new(), } } fn change_current_check_point(&self, anchored_check_point: String) { - let Some(anchored_index) = self - .check_points - .iter() - .position(|check_point| check_point == &anchored_check_point) + let mut current_index = self.current_index.lock().unwrap(); + + let Some(anchored_index) = self.anchored_index(*current_index, &anchored_check_point) else { - // Ignore unrecognizable checkpoints... + trace!("Ignore {} at {:?}", anchored_check_point, current()); return; }; - let mut current_check_point = self.current_check_point.lock().unwrap(); - - let should_change = - match anchored_index.cmp(&self.expected_next_index(¤t_check_point)) { - Equal => true, - Greater => { - // anchor is one of future check points; block the current thread until - // that happens - current_check_point = self - .condvar - .wait_while(current_check_point, |current_check_point| { - anchored_index != self.expected_next_index(current_check_point) - }) - .unwrap(); - true - } - // anchor is already observed. - Less => false, - }; + let next_index = self.expected_next_index(*current_index); + let should_change = match anchored_index.cmp(&next_index) { + Equal => true, + Greater => { + trace!("Blocked on {} at {:?}", anchored_check_point, current()); + // anchor is one of future check points; block the current thread until + // that happens + current_index = self + .condvar + .wait_while(current_index, |&mut current_index| { + let Some(anchored_index) = + self.anchored_index(current_index, &anchored_check_point) + else { + // don't wait. seems the progress is made by other threads + // anchored to the same checkpoint. + return false; + }; + let next_index = self.expected_next_index(current_index); + + // determine we should wait further or not + match anchored_index.cmp(&next_index) { + Equal => false, + Greater => { + trace!( + "Re-blocked on {} ({} != {}) at {:?}", + anchored_check_point, + anchored_index, + next_index, + current() + ); + true + } + Less => unreachable!(), + } + }) + .unwrap(); + true + } + Less => unreachable!(), + }; if should_change { - *current_check_point = anchored_check_point; + if *current_index != anchored_index { + trace!("Progressed to: {} at {:?}", anchored_check_point, current()); + *current_index = anchored_index; + } + self.condvar.notify_all(); } } - fn expected_next_index(&self, current_check_point: &String) -> usize { - let current_index = self - .check_points - .iter() - .position(|check_point| check_point == current_check_point) - .unwrap(); + fn expected_next_index(&self, current_index: usize) -> usize { current_index.checked_add(1).unwrap() } + + fn anchored_index( + &self, + current_index: usize, + anchored_check_point: &String, + ) -> Option { + self.check_points[current_index..] + .iter() + .position(|check_point| check_point == anchored_check_point) + .map(|subslice_index| subslice_index.checked_add(current_index).unwrap()) + } } lazy_static! { @@ -142,11 +170,13 @@ mod sleepless_testing_real { } fn deactivate(&self) { - assert_eq!( - *self.0.check_points.last().unwrap(), - *self.0.current_check_point.lock().unwrap(), - "unfinished progress" - ); + if !panicking() { + assert_eq!( + self.0.check_points.len().checked_sub(1).unwrap(), + *self.0.current_index.lock().unwrap(), + "unfinished progress" + ); + } THREAD_REGISTRY.lock().unwrap().remove(&self.1).unwrap(); } } @@ -299,7 +329,7 @@ mod sleepless_testing_real { } #[cfg(not(test))] -mod sleepless_testing_dummy { +mod dummy { use std::fmt::Debug; #[inline]