diff --git a/scylla/src/transport/mod.rs b/scylla/src/transport/mod.rs index 55184aadc6..75b8266d8c 100644 --- a/scylla/src/transport/mod.rs +++ b/scylla/src/transport/mod.rs @@ -18,6 +18,7 @@ pub mod retry_policy; pub mod session; pub mod session_builder; pub mod speculative_execution; +pub mod timestamp_generator; pub mod topology; pub use crate::frame::{Authenticator, Compression}; diff --git a/scylla/src/transport/timestamp_generator.rs b/scylla/src/transport/timestamp_generator.rs new file mode 100644 index 0000000000..2dac28031c --- /dev/null +++ b/scylla/src/transport/timestamp_generator.rs @@ -0,0 +1,93 @@ +use std::{ + ops::DerefMut, + sync::atomic::AtomicI64, + time::{SystemTime, UNIX_EPOCH}, +}; + +use futures::lock::Mutex; +use std::sync::atomic::Ordering; +use tokio::time::{Duration, Instant}; +use tracing::warn; + +pub(crate) trait TimestampGenerator { + async fn next_timestamp(&self) -> i64; +} + +pub struct MonotonicTimestampGenerator { + last: AtomicI64, + last_warning: Mutex, + warning_threshold_us: i64, + warning_interval_ms: i64, +} + +impl MonotonicTimestampGenerator { + pub fn new_with_settings(warning_threshold_us: i64, warning_interval_ms: i64) -> Self { + MonotonicTimestampGenerator { + last: AtomicI64::new(0), + last_warning: Mutex::new(Instant::now()), + warning_threshold_us, + warning_interval_ms, + } + } + pub fn new() -> Self { + MonotonicTimestampGenerator::new_with_settings(1000000, 1000) + } + + // This is guaranteed to return a monotonic timestamp. If clock skew is detected + // then this method will increment the last timestamp. + async fn compute_next(&self, last: i64) -> i64 { + let current = SystemTime::now().duration_since(UNIX_EPOCH); + if let Ok(cur_time) = current { + let u_cur = cur_time.as_micros() as i64; + if u_cur > last { + return u_cur; + } else if self.warning_threshold_us >= 0 && last - u_cur > self.warning_threshold_us { + let mut last_warn = self.last_warning.lock().await; + let now = Instant::now(); + if now + >= last_warn + .checked_add(Duration::from_millis(self.warning_interval_ms as u64)) + .unwrap() + { + *last_warn = now; + drop(last_warn); + warn!( + "Clock skew detected. The current time ({}) was {} \ + microseconds behind the last generated timestamp ({}). \ + The next generated timestamp will be artificially incremented \ + to guarantee monotonicity.", + u_cur, + last - u_cur, + last + ) + } + } + } else { + warn!("Clock skew detected. The current time was behind UNIX epoch."); + } + + last + 1 + } +} + +impl Default for MonotonicTimestampGenerator { + fn default() -> Self { + Self::new() + } +} + +impl TimestampGenerator for MonotonicTimestampGenerator { + async fn next_timestamp(&self) -> i64 { + loop { + let last = self.last.load(Ordering::SeqCst); + let cur = self.compute_next(last).await; + if self + .last + .compare_exchange(last, cur, Ordering::SeqCst, Ordering::SeqCst) + .is_ok() + { + return cur; + } + } + } +}