diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index d527bd1f..fc0688ef 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -14,6 +14,7 @@ use std::sync::Arc; use std::time::{Duration, SystemTime}; use tokio::select; use tokio::sync::{Notify, Semaphore}; +use tracing::info; #[derive(Debug)] pub struct CachedCounterValue { @@ -203,17 +204,26 @@ impl Batcher { return result; } else { ready = select! { - _ = self.notifier.notified() => self.batch_ready(max), - _ = tokio::time::sleep(self.interval) => true, + _ = async { + loop { + self.notifier.notified().await; + if self.batch_ready(max) { + break; + } + } + } => { + info!("Priority flush!"); + true + }, + _ = tokio::time::sleep(self.interval) => { + // info!("Time limit hit!"); + true + }, } } } } - pub fn is_empty(&self) -> bool { - self.updates.is_empty() - } - fn batch_ready(&self, size: usize) -> bool { self.updates.len() >= size || self diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index d77a16bd..c8adf009 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -14,7 +14,7 @@ use redis::{AsyncCommands, RedisError}; use std::collections::HashSet; use std::str::FromStr; use std::time::{Duration, Instant}; -use tracing::{debug_span, Instrument}; +use tracing::{debug_span, warn, Instrument}; // Note: this implementation does not guarantee exact limits. Ensuring that we // never go over the limits would hurt performance. This implementation @@ -215,6 +215,14 @@ impl AsyncRedisStorage { .clone() .incr::<&str, i32, u64>("LIMITADOR_LIVE_CHECK", 1) .await + .map_err(|err| { + let err = >::into(err); + if !err.is_transient() { + panic!("Unrecoverable Redis error: {}", err); + } + warn!("Live check failure: {}", err); + err + }) .is_ok() .then(|| histogram!("liveness_latency").record(now.elapsed().as_secs_f64())) .is_some() diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 992f2a81..9bbafbf4 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -17,10 +17,11 @@ use redis::aio::{ConnectionLike, ConnectionManager}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; +use std::sync::atomic::Ordering::Acquire; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; -use tracing::{debug_span, error, warn, Instrument}; +use tracing::{debug_span, error, info, warn, Instrument}; // This is just a first version. // @@ -189,7 +190,6 @@ impl CachedRedisStorage { AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); { - let storage = async_redis_storage.clone(); let counters_cache_clone = counters_cache.clone(); let conn = redis_conn_manager.clone(); let p = Arc::clone(&partitioned); @@ -197,7 +197,6 @@ impl CachedRedisStorage { loop { flush_batcher_and_update_counters( conn.clone(), - storage.is_alive().await, counters_cache_clone.clone(), p.clone(), batch_size, @@ -333,35 +332,38 @@ async fn update_counters( #[tracing::instrument(skip_all)] async fn flush_batcher_and_update_counters( mut redis_conn: C, - storage_is_alive: bool, cached_counters: Arc, partitioned: Arc, batch_size: usize, ) { - if partitioned.load(Ordering::Acquire) || !storage_is_alive { - if !cached_counters.batcher().is_empty() { + let updated_counters = cached_counters + .batcher() + .consume(batch_size, |counters| { + if !counters.is_empty() && !partitioned.load(Acquire) { + info!("Flushing {} counter updates", counters.len()); + } + update_counters(&mut redis_conn, counters) + }) + .await + .map(|res| { + // info!("Success {} counters", res.len()); flip_partitioned(&partitioned, false); - } - } else { - let updated_counters = cached_counters - .batcher() - .consume(batch_size, |counters| { - update_counters(&mut redis_conn, counters) - }) - .await - .or_else(|err| { - if err.is_transient() { - flip_partitioned(&partitioned, true); - Ok(Vec::new()) - } else { - Err(err) + res + }) + .or_else(|err| { + if err.is_transient() { + if flip_partitioned(&partitioned, true) { + warn!("Error flushing {}", err); } - }) - .expect("Unrecoverable Redis error!"); + Ok(Vec::new()) + } else { + Err(err) + } + }) + .expect("Unrecoverable Redis error!"); - for (counter, new_value, remote_deltas, ttl) in updated_counters { - cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl); - } + for (counter, new_value, remote_deltas, ttl) in updated_counters { + cached_counters.apply_remote_delta(counter, new_value, remote_deltas, ttl); } }