From 33d9762dc8346a2b43192ae1cde3d3a00f047e60 Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Tue, 14 May 2024 14:46:17 -0400 Subject: [PATCH] Wired distributed storage in --- limitador-server/src/config.rs | 9 +++ limitador-server/src/main.rs | 63 ++++++++++++++++++- limitador/src/lib.rs | 2 +- .../storage/distributed/cr_counter_value.rs | 5 +- limitador/src/storage/mod.rs | 14 +---- 5 files changed, 75 insertions(+), 18 deletions(-) diff --git a/limitador-server/src/config.rs b/limitador-server/src/config.rs index e84bfbea..8309b6fe 100644 --- a/limitador-server/src/config.rs +++ b/limitador-server/src/config.rs @@ -141,6 +141,7 @@ pub enum StorageConfiguration { InMemory(InMemoryStorageConfiguration), Disk(DiskStorageConfiguration), Redis(RedisStorageConfiguration), + Distributed(DistributedStorageConfiguration), #[cfg(feature = "infinispan")] Infinispan(InfinispanStorageConfiguration), } @@ -150,6 +151,14 @@ pub struct InMemoryStorageConfiguration { pub cache_size: Option, } +#[derive(PartialEq, Eq, Debug)] +pub struct DistributedStorageConfiguration { + pub name: String, + pub cache_size: Option, + pub local: String, + pub broadcast: String, +} + #[derive(PartialEq, Eq, Debug)] pub struct DiskStorageConfiguration { pub path: String, diff --git a/limitador-server/src/main.rs b/limitador-server/src/main.rs index 818d7da7..ffceb9d9 100644 --- a/limitador-server/src/main.rs +++ b/limitador-server/src/main.rs @@ -8,8 +8,9 @@ extern crate clap; #[cfg(feature = "infinispan")] use crate::config::InfinispanStorageConfiguration; use crate::config::{ - Configuration, DiskStorageConfiguration, InMemoryStorageConfiguration, - RedisStorageCacheConfiguration, RedisStorageConfiguration, StorageConfiguration, + Configuration, DiskStorageConfiguration, DistributedStorageConfiguration, + InMemoryStorageConfiguration, RedisStorageCacheConfiguration, RedisStorageConfiguration, + StorageConfiguration, }; use crate::envoy_rls::server::{run_envoy_rls_server, RateLimitHeaders}; use crate::http_api::server::run_http_server; @@ -27,6 +28,7 @@ use limitador::storage::redis::{ AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE, DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS, }; +use limitador::storage::DistributedInMemoryStorage; use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage}; use limitador::{ storage, AsyncRateLimiter, AsyncRateLimiterBuilder, RateLimiter, RateLimiterBuilder, @@ -93,6 +95,7 @@ impl Limiter { #[cfg(feature = "infinispan")] StorageConfiguration::Infinispan(cfg) => Self::infinispan_limiter(cfg).await, StorageConfiguration::InMemory(cfg) => Self::in_memory_limiter(cfg), + StorageConfiguration::Distributed(cfg) => Self::distributed_limiter(cfg), StorageConfiguration::Disk(cfg) => Self::disk_limiter(cfg), }; @@ -207,6 +210,19 @@ impl Limiter { Self::Blocking(rate_limiter_builder.build()) } + fn distributed_limiter(cfg: DistributedStorageConfiguration) -> Self { + let storage = DistributedInMemoryStorage::new( + cfg.name, + cfg.cache_size.or_else(guess_cache_size).unwrap(), + cfg.local, + cfg.broadcast, + ); + let rate_limiter_builder = + RateLimiterBuilder::with_storage(Storage::with_counter_storage(Box::new(storage))); + + Self::Blocking(rate_limiter_builder.build()) + } + pub async fn load_limits_from_file>( &self, path: &P, @@ -631,6 +647,41 @@ fn create_config() -> (Configuration, &'static str) { .display_order(6) .help("Timeout for Redis commands in milliseconds"), ), + ) + .subcommand( + Command::new("distributed") + .about("Replicates CRDT-based counters across multiple Limitador servers") + .display_order(5) + .arg( + Arg::new("NAME") + .action(ArgAction::Set) + .required(true) + .display_order(2) + .help("Unique name to identify this Limitador instance"), + ) + .arg( + Arg::new("LOCAL") + .action(ArgAction::Set) + .required(true) + .display_order(2) + .help("Local IP:PORT to send datagrams from"), + ) + .arg( + Arg::new("BROADCAST") + .action(ArgAction::Set) + .required(true) + .display_order(3) + .help("Broadcast IP:PORT to send datagrams to"), + ) + .arg( + Arg::new("CACHE_SIZE") + .long("cache") + .short('c') + .action(ArgAction::Set) + .value_parser(value_parser!(u64)) + .display_order(4) + .help("Sets the size of the cache for 'qualified counters'"), + ), ); #[cfg(feature = "infinispan")] @@ -750,6 +801,14 @@ fn create_config() -> (Configuration, &'static str) { Some(("memory", sub)) => StorageConfiguration::InMemory(InMemoryStorageConfiguration { cache_size: sub.get_one::("CACHE_SIZE").copied(), }), + Some(("distributed", sub)) => { + StorageConfiguration::Distributed(DistributedStorageConfiguration { + name: sub.get_one::("NAME").unwrap().to_owned(), + local: sub.get_one::("LOCAL").unwrap().to_owned(), + broadcast: sub.get_one::("BROADCAST").unwrap().to_owned(), + cache_size: sub.get_one::("CACHE_SIZE").copied(), + }) + } None => match storage_config_from_env() { Ok(storage_cfg) => storage_cfg, Err(_) => { diff --git a/limitador/src/lib.rs b/limitador/src/lib.rs index d95a0f22..25010829 100644 --- a/limitador/src/lib.rs +++ b/limitador/src/lib.rs @@ -197,7 +197,7 @@ use std::collections::{HashMap, HashSet}; use crate::counter::Counter; use crate::errors::LimitadorError; use crate::limit::{Limit, Namespace}; -use crate::storage::distributed::CrInMemoryStorage as InMemoryStorage; +use crate::storage::in_memory::InMemoryStorage; use crate::storage::{AsyncCounterStorage, AsyncStorage, Authorization, CounterStorage, Storage}; #[macro_use] diff --git a/limitador/src/storage/distributed/cr_counter_value.rs b/limitador/src/storage/distributed/cr_counter_value.rs index 2c38e9a7..eb6fc1fb 100644 --- a/limitador/src/storage/distributed/cr_counter_value.rs +++ b/limitador/src/storage/distributed/cr_counter_value.rs @@ -43,10 +43,7 @@ impl CrCounterValue { } pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) { - if self - .expiry - .update_if_expired(time_window.as_secs(), when) - { + if self.expiry.update_if_expired(time_window.as_secs(), when) { self.value.store(increment, Ordering::SeqCst); } else { self.value.fetch_add(increment, Ordering::SeqCst); diff --git a/limitador/src/storage/mod.rs b/limitador/src/storage/mod.rs index 4dd207e7..25c66f96 100644 --- a/limitador/src/storage/mod.rs +++ b/limitador/src/storage/mod.rs @@ -3,7 +3,6 @@ use crate::limit::{Limit, Namespace}; use crate::InMemoryStorage; use async_trait::async_trait; use std::collections::{HashMap, HashSet}; -use std::env; use std::sync::RwLock; use thiserror::Error; @@ -13,6 +12,8 @@ pub mod distributed; pub mod in_memory; pub mod wasm; +pub use crate::storage::distributed::CrInMemoryStorage as DistributedInMemoryStorage; + #[cfg(feature = "redis_storage")] pub mod redis; @@ -44,18 +45,9 @@ pub struct AsyncStorage { impl Storage { pub fn new(cache_size: u64) -> Self { - let local = - env::var("LOCAL").expect("We need the env var LOCAL to be set to your local :port"); - let broadcast = env::var("BROADCAST") - .expect("We need the env var BROADCAST to be set to your broadcast :port"); Self { limits: RwLock::new(HashMap::new()), - counters: Box::new(InMemoryStorage::new( - local.to_owned(), - cache_size, - local, - broadcast, - )), + counters: Box::new(InMemoryStorage::new(cache_size)), } }