From 96d404217da3840a721c400829155e880acfa5bb Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 16 May 2024 10:08:59 -0400 Subject: [PATCH 1/4] All ttls as Durations --- limitador-server/src/envoy_rls/server.rs | 2 +- limitador-server/src/http_api/server.rs | 2 +- limitador/src/counter.rs | 4 +-- .../src/storage/atomic_expiring_value.rs | 22 +++++++------ limitador/src/storage/disk/expiring_value.rs | 12 ++++--- limitador/src/storage/disk/rocksdb_storage.rs | 2 +- limitador/src/storage/in_memory.rs | 32 ++++++------------- limitador/src/storage/keys.rs | 2 +- limitador/src/storage/redis/counters_cache.rs | 13 +++----- limitador/src/storage/redis/mod.rs | 4 +-- limitador/src/storage/redis/redis_async.rs | 4 +-- limitador/src/storage/redis/redis_cached.rs | 6 ++-- limitador/src/storage/redis/redis_sync.rs | 4 +-- 13 files changed, 51 insertions(+), 58 deletions(-) diff --git a/limitador-server/src/envoy_rls/server.rs b/limitador-server/src/envoy_rls/server.rs index 7735f8f6..e6d5f08f 100644 --- a/limitador-server/src/envoy_rls/server.rs +++ b/limitador-server/src/envoy_rls/server.rs @@ -175,7 +175,7 @@ pub fn to_response_header( let mut all_limits_text = String::with_capacity(20 * counters.len()); counters.iter_mut().for_each(|counter| { all_limits_text.push_str( - format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(), + format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(), ); if let Some(name) = counter.limit().name() { all_limits_text diff --git a/limitador-server/src/http_api/server.rs b/limitador-server/src/http_api/server.rs index ec0606b6..97937d69 100644 --- a/limitador-server/src/http_api/server.rs +++ b/limitador-server/src/http_api/server.rs @@ -253,7 +253,7 @@ pub fn add_response_header( let mut all_limits_text = String::with_capacity(20 * counters.len()); counters.iter_mut().for_each(|counter| { all_limits_text.push_str( - format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(), + format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(), ); if let Some(name) = counter.limit().name() { all_limits_text diff --git a/limitador/src/counter.rs b/limitador/src/counter.rs index 702d0c70..efd486e7 100644 --- a/limitador/src/counter.rs +++ b/limitador/src/counter.rs @@ -68,8 +68,8 @@ impl Counter { false } - pub fn seconds(&self) -> u64 { - self.limit.seconds() + pub fn window(&self) -> Duration { + Duration::from_secs(self.limit.seconds()) } pub fn namespace(&self) -> &Namespace { diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index 8b00c7bd..0353e041 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -33,7 +33,7 @@ impl AtomicExpiringValue { self.value.fetch_add(delta, Ordering::SeqCst) + delta } - pub fn update(&self, delta: u64, ttl: u64, when: SystemTime) -> u64 { + pub fn update(&self, delta: u64, ttl: Duration, when: SystemTime) -> u64 { if self.expiry.update_if_expired(ttl, when) { self.value.store(delta, Ordering::SeqCst); return delta; @@ -42,7 +42,7 @@ impl AtomicExpiringValue { } pub fn ttl(&self) -> Duration { - self.expiry.duration() + self.expiry.ttl() } } @@ -70,7 +70,7 @@ impl AtomicExpiryTime { .as_micros() as u64 } - pub fn duration(&self) -> Duration { + pub fn ttl(&self) -> Duration { let expiry = SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst)); expiry @@ -89,8 +89,8 @@ impl AtomicExpiryTime { .store(Self::since_epoch(expiry), Ordering::SeqCst); } - pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool { - let ttl_micros = ttl * 1_000_000; + pub fn update_if_expired(&self, ttl: Duration, when: SystemTime) -> bool { + let ttl_micros = u64::try_from(ttl.as_micros()).expect("Wow! The future is here!"); let when_micros = Self::since_epoch(when); let expiry = self.expiry.load(Ordering::SeqCst); if expiry <= when_micros { @@ -208,7 +208,7 @@ mod tests { fn updates_when_valid() { let now = SystemTime::now(); let val = AtomicExpiringValue::new(42, now + Duration::from_secs(1)); - val.update(3, 10, now); + val.update(3, Duration::from_secs(10), now); assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); } @@ -217,7 +217,7 @@ mod tests { let now = SystemTime::now(); let val = AtomicExpiringValue::new(42, now); assert_eq!(val.ttl(), Duration::ZERO); - val.update(3, 10, now); + val.update(3, Duration::from_secs(10), now); assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); } @@ -228,10 +228,14 @@ mod tests { thread::scope(|s| { s.spawn(|| { - atomic_expiring_value.update(1, 1, now); + atomic_expiring_value.update(1, Duration::from_secs(1), now); }); s.spawn(|| { - atomic_expiring_value.update(2, 1, now + Duration::from_secs(11)); + atomic_expiring_value.update( + 2, + Duration::from_secs(1), + now + Duration::from_secs(11), + ); }); }); assert!([2u64, 3u64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst))); diff --git a/limitador/src/storage/disk/expiring_value.rs b/limitador/src/storage/disk/expiring_value.rs index 948a85db..2932d6db 100644 --- a/limitador/src/storage/disk/expiring_value.rs +++ b/limitador/src/storage/disk/expiring_value.rs @@ -25,9 +25,9 @@ impl ExpiringValue { } #[must_use] - pub fn update(self, delta: u64, ttl: u64, now: SystemTime) -> Self { + pub fn update(self, delta: u64, ttl: Duration, now: SystemTime) -> Self { let expiry = if self.expiry <= now { - now + Duration::from_secs(ttl) + now + ttl } else { self.expiry }; @@ -132,7 +132,11 @@ mod tests { #[test] fn updates_when_valid() { let now = SystemTime::now(); - let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now); + let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update( + 3, + Duration::from_secs(10), + now, + ); assert_eq!(val.value_at(now - Duration::from_secs(1)), 45); } @@ -141,7 +145,7 @@ mod tests { let now = SystemTime::now(); let val = ExpiringValue::new(42, now); assert_eq!(val.ttl(), Duration::ZERO); - let val = val.update(3, 10, now); + let val = val.update(3, Duration::from_secs(10), now); assert_eq!(val.value_at(now - Duration::from_secs(1)), 3); } diff --git a/limitador/src/storage/disk/rocksdb_storage.rs b/limitador/src/storage/disk/rocksdb_storage.rs index 4304af09..1e19c2c6 100644 --- a/limitador/src/storage/disk/rocksdb_storage.rs +++ b/limitador/src/storage/disk/rocksdb_storage.rs @@ -219,7 +219,7 @@ impl RocksDbStorage { let _entered = span.enter(); self.db .merge(key, >>::into(expiring_value))?; - return Ok(value.update(delta, counter.seconds(), now)); + return Ok(value.update(delta, counter.window(), now)); } Ok(value) } diff --git a/limitador/src/storage/in_memory.rs b/limitador/src/storage/in_memory.rs index b26d44d5..f32e2a22 100644 --- a/limitador/src/storage/in_memory.rs +++ b/limitador/src/storage/in_memory.rs @@ -56,36 +56,27 @@ impl CounterStorage for InMemoryStorage { if counter.is_qualified() { let value = match self.qualified_counters.get(counter) { None => self.qualified_counters.get_with(counter.clone(), || { - Arc::new(AtomicExpiringValue::new( - 0, - now + Duration::from_secs(counter.seconds()), - )) + Arc::new(AtomicExpiringValue::new(0, now + counter.window())) }), Some(counter) => counter, }; - value.update(delta, counter.seconds(), now); + value.update(delta, counter.window(), now); } else { match limits_by_namespace.entry(counter.limit().namespace().clone()) { Entry::Vacant(v) => { let mut limits = HashMap::new(); limits.insert( counter.limit().clone(), - AtomicExpiringValue::new( - delta, - now + Duration::from_secs(counter.seconds()), - ), + AtomicExpiringValue::new(delta, now + counter.window()), ); v.insert(limits); } Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { Entry::Vacant(v) => { - v.insert(AtomicExpiringValue::new( - delta, - now + Duration::from_secs(counter.seconds()), - )); + v.insert(AtomicExpiringValue::new(delta, now + counter.window())); } Entry::Occupied(o) => { - o.get().update(delta, counter.seconds(), now); + o.get().update(delta, counter.window(), now); } }, } @@ -102,8 +93,8 @@ impl CounterStorage for InMemoryStorage { ) -> Result { let limits_by_namespace = self.limits_for_namespace.read().unwrap(); let mut first_limited = None; - let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new(); - let mut qualified_counter_values_to_updated: Vec<(Arc, u64)> = + let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new(); + let mut qualified_counter_values_to_updated: Vec<(Arc, Duration)> = Vec::new(); let now = SystemTime::now(); @@ -138,17 +129,14 @@ impl CounterStorage for InMemoryStorage { return Ok(limited); } } - counter_values_to_update.push((atomic_expiring_value, counter.seconds())); + counter_values_to_update.push((atomic_expiring_value, counter.window())); } // Process qualified counters for counter in counters.iter_mut().filter(|c| c.is_qualified()) { let value = match self.qualified_counters.get(counter) { None => self.qualified_counters.get_with(counter.clone(), || { - Arc::new(AtomicExpiringValue::new( - 0, - now + Duration::from_secs(counter.seconds()), - )) + Arc::new(AtomicExpiringValue::new(0, now + counter.window())) }), Some(counter) => counter, }; @@ -159,7 +147,7 @@ impl CounterStorage for InMemoryStorage { } } - qualified_counter_values_to_updated.push((value, counter.seconds())); + qualified_counter_values_to_updated.push((value, counter.window())); } if let Some(limited) = first_limited { diff --git a/limitador/src/storage/keys.rs b/limitador/src/storage/keys.rs index 7b8a3596..6d32977c 100644 --- a/limitador/src/storage/keys.rs +++ b/limitador/src/storage/keys.rs @@ -153,7 +153,7 @@ pub mod bin { CounterKey { ns: counter.namespace().as_ref(), - seconds: counter.seconds(), + seconds: counter.window().as_secs(), conditions, variables: counter.variables_for_key(), } diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index ef924d92..06d9f7e9 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -27,7 +27,7 @@ impl CachedCounterValue { pub fn from_authority(counter: &Counter, value: u64) -> Self { let now = SystemTime::now(); Self { - value: AtomicExpiringValue::new(value, now + Duration::from_secs(counter.seconds())), + value: AtomicExpiringValue::new(value, now + counter.window()), initial_value: AtomicU64::new(value), from_authority: AtomicBool::new(true), } @@ -36,10 +36,7 @@ impl CachedCounterValue { pub fn load_from_authority_asap(counter: &Counter, temp_value: u64) -> Self { let now = SystemTime::now(); Self { - value: AtomicExpiringValue::new( - temp_value, - now + Duration::from_secs(counter.seconds()), - ), + value: AtomicExpiringValue::new(temp_value, now + counter.window()), initial_value: AtomicU64::new(0), from_authority: AtomicBool::new(false), } @@ -57,7 +54,7 @@ impl CachedCounterValue { pub fn delta(&self, counter: &Counter, delta: u64) -> u64 { let value = self .value - .update(delta, counter.seconds(), SystemTime::now()); + .update(delta, counter.window(), SystemTime::now()); if value == delta { // new window, invalidate initial value // which happens _after_ the self.value was reset, see `pending_writes` @@ -133,7 +130,7 @@ impl CachedCounterValue { self.hits(counter) as i128 + delta as i128 > counter.max_value() as i128 } - pub fn to_next_window(&self) -> Duration { + pub fn ttl(&self) -> Duration { self.value.ttl() } @@ -479,7 +476,7 @@ mod tests { let counter = test_counter(10, None); let value = CachedCounterValue::from_authority(&counter, 0); value.delta(&counter, hits); - assert!(value.to_next_window() > Duration::from_millis(59999)); + assert!(value.ttl() > Duration::from_millis(59999)); assert_eq!(value.hits(&counter), hits); let remaining = counter.max_value() - hits; assert_eq!(value.remaining(&counter), remaining); diff --git a/limitador/src/storage/redis/mod.rs b/limitador/src/storage/redis/mod.rs index 2b5c14e2..93167908 100644 --- a/limitador/src/storage/redis/mod.rs +++ b/limitador/src/storage/redis/mod.rs @@ -56,10 +56,10 @@ pub fn is_limited( if x >= 0 { Duration::from_millis(x as u64) } else { - Duration::from_secs(counter.seconds()) + counter.window() } }) - .unwrap_or(Duration::from_secs(counter.seconds())); + .unwrap_or(counter.window()); counter.set_expires_in(expires_in); if first_limited.is_none() && remaining.is_none() { diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index c7df1d0c..18175c75 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -52,7 +52,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key_for_counter(counter)) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke_async::<_, _>(&mut con) .instrument(debug_span!("datastore")) @@ -116,7 +116,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke_async::<_, _>(&mut con) .instrument(debug_span!("datastore")) diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 736927b8..73daf6e3 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -93,7 +93,7 @@ impl AsyncCounterStorage for CachedRedisStorage { .checked_sub(delta) .unwrap_or_default(), ); - counter.set_expires_in(val.to_next_window()); + counter.set_expires_in(val.ttl()); } } _ => { @@ -114,7 +114,7 @@ impl AsyncCounterStorage for CachedRedisStorage { } if load_counters { counter.set_remaining(remaining - delta); - counter.set_expires_in(fake.to_next_window()); // todo: this is a plain lie! + counter.set_expires_in(fake.ttl()); // todo: this is a plain lie! } } } @@ -298,7 +298,7 @@ async fn update_counters( if delta > 0 { script_invocation.key(key_for_counter(&counter)); script_invocation.key(key_for_counters_of_limit(counter.limit())); - script_invocation.arg(counter.seconds()); + script_invocation.arg(counter.window().as_secs()); script_invocation.arg(delta); // We need to store the counter in the actual order we are sending it to the script res.push((counter, last_value_from_redis, delta, 0)); diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index 11113de1..002e9444 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -45,7 +45,7 @@ impl CounterStorage for RedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key_for_counter(counter)) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke(&mut *con)?; @@ -97,7 +97,7 @@ impl CounterStorage for RedisStorage { redis::Script::new(SCRIPT_UPDATE_COUNTER) .key(key) .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.seconds()) + .arg(counter.window().as_secs()) .arg(delta) .invoke(&mut *con)?; } From c598b52bccf6d3fe4e99341eded1472ca3b9398f Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 16 May 2024 10:21:30 -0400 Subject: [PATCH 2/4] Expiries as SystemTimes --- .../src/storage/atomic_expiring_value.rs | 4 +- limitador/src/storage/redis/counters_cache.rs | 56 ++++++------------- limitador/src/storage/redis/redis_cached.rs | 33 +++++++---- 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index 0353e041..f8d19ee0 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -28,8 +28,8 @@ impl AtomicExpiringValue { } #[allow(dead_code)] - pub fn add_and_set_expiry(&self, delta: u64, expire_at: SystemTime) -> u64 { - self.expiry.update(expire_at); + pub fn add_and_set_expiry(&self, delta: u64, expiry: SystemTime) -> u64 { + self.expiry.update(expiry); self.value.fetch_add(delta, Ordering::SeqCst) + delta } diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 06d9f7e9..94223cdc 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -304,32 +304,25 @@ impl CountersCache { counter: Counter, redis_val: u64, remote_deltas: u64, - redis_expiry: i64, + expiry: SystemTime, ) -> Arc { - if redis_expiry > 0 { - let expiry_ts = SystemTime::UNIX_EPOCH + Duration::from_millis(redis_expiry as u64); - if expiry_ts > SystemTime::now() { - let mut from_cache = true; - let cached = self.cache.get_with(counter.clone(), || { + if expiry > SystemTime::now() { + let mut from_cache = true; + let cached = self.cache.get_with(counter.clone(), || { + from_cache = false; + if let Some(entry) = self.batcher.updates.get(&counter) { gauge!("cache_size").increment(1); - from_cache = false; - if let Some(entry) = self.batcher.updates.get(&counter) { - let cached_value = entry.value(); - cached_value.add_from_authority( - remote_deltas, - expiry_ts, - counter.max_value(), - ); - cached_value.clone() - } else { - Arc::new(CachedCounterValue::from_authority(&counter, redis_val)) - } - }); - if from_cache { - cached.add_from_authority(remote_deltas, expiry_ts, counter.max_value()); + let cached_value = entry.value(); + cached_value.add_from_authority(remote_deltas, expiry, counter.max_value()); + cached_value.clone() + } else { + Arc::new(CachedCounterValue::from_authority(&counter, redis_val)) } - return cached; + }); + if from_cache { + cached.add_from_authority(remote_deltas, expiry, counter.max_value()); } + return cached; } Arc::new(CachedCounterValue::load_from_authority_asap( &counter, redis_val, @@ -392,7 +385,6 @@ impl CountersCacheBuilder { mod tests { use std::collections::HashMap; use std::ops::Add; - use std::time::UNIX_EPOCH; use crate::limit::Limit; @@ -620,11 +612,7 @@ mod tests { counter.clone(), 10, 0, - SystemTime::now() - .add(Duration::from_secs(1)) - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros() as i64, + SystemTime::now().add(Duration::from_secs(1)), ); assert!(cache.get(&counter).is_some()); @@ -650,11 +638,7 @@ mod tests { counter.clone(), current_value, 0, - SystemTime::now() - .add(Duration::from_secs(1)) - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros() as i64, + SystemTime::now().add(Duration::from_secs(1)), ); assert_eq!( @@ -674,11 +658,7 @@ mod tests { counter.clone(), current_val, 0, - SystemTime::now() - .add(Duration::from_secs(1)) - .duration_since(UNIX_EPOCH) - .unwrap() - .as_micros() as i64, + SystemTime::now().add(Duration::from_secs(1)), ); cache.increase_by(&counter, increase_by).await; diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 73daf6e3..fbf4aa89 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tracing::{debug_span, error, info, warn, Instrument}; // This is just a first version. @@ -282,14 +282,16 @@ impl CachedRedisStorageBuilder { async fn update_counters( redis_conn: &mut C, counters_and_deltas: HashMap>, -) -> Result, (Vec<(Counter, u64, u64, i64)>, StorageErr)> { +) -> Result, (Vec<(Counter, u64, u64, SystemTime)>, StorageErr)> +{ let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS); let mut script_invocation = redis_script.prepare_invoke(); let res = if counters_and_deltas.is_empty() { Default::default() } else { - let mut res: Vec<(Counter, u64, u64, i64)> = Vec::with_capacity(counters_and_deltas.len()); + let mut res: Vec<(Counter, u64, u64, SystemTime)> = + Vec::with_capacity(counters_and_deltas.len()); for (counter, value) in counters_and_deltas { let (delta, last_value_from_redis) = value @@ -301,7 +303,7 @@ async fn update_counters( script_invocation.arg(counter.window().as_secs()); script_invocation.arg(delta); // We need to store the counter in the actual order we are sending it to the script - res.push((counter, last_value_from_redis, delta, 0)); + res.push((counter, last_value_from_redis, delta, UNIX_EPOCH)); } } @@ -327,7 +329,8 @@ async fn update_counters( .unwrap_or(0) .saturating_sub(*val); // new value - previous one = remote writes *val = u64::try_from(script_res[j]).unwrap_or(0); // update to value to newest - *expires_at = script_res[j + 1]; + *expires_at = + UNIX_EPOCH + Duration::from_millis(u64::try_from(script_res[j + 1]).unwrap_or(0)); } res }; @@ -447,13 +450,15 @@ mod tests { arc.delta(&counter, LOCAL_INCREMENTS); counters_and_deltas.insert(counter.clone(), arc); - let one_sec_from_now = SystemTime::now() - .add(Duration::from_secs(1)) - .duration_since(UNIX_EPOCH) - .unwrap(); + let one_sec_from_now = SystemTime::now().add(Duration::from_secs(1)); let mock_response = Value::Bulk(vec![ Value::Int(NEW_VALUE_FROM_REDIS as i64), - Value::Int(one_sec_from_now.as_millis() as i64), + Value::Int( + one_sec_from_now + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as i64, + ), ]); let mut mock_client = MockRedisConnection::new(vec![MockCmd::new( @@ -478,7 +483,13 @@ mod tests { NEW_VALUE_FROM_REDIS - INITIAL_VALUE_FROM_REDIS - LOCAL_INCREMENTS, remote_increments ); - assert_eq!(one_sec_from_now.as_millis(), expire_at as u128); + assert_eq!( + one_sec_from_now + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(), + expire_at.duration_since(UNIX_EPOCH).unwrap().as_millis() + ); } #[tokio::test] From d15ac9a0a8176fd22c51bf408c3c7410073c87db Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Thu, 16 May 2024 10:33:10 -0400 Subject: [PATCH 3/4] Some feature clean ups --- limitador/src/counter.rs | 1 + limitador/src/storage/atomic_expiring_value.rs | 9 ++------- limitador/src/storage/redis/counters_cache.rs | 1 + limitador/tests/helpers/tests_limiter.rs | 4 ++-- limitador/tests/integration_tests.rs | 3 +++ 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/limitador/src/counter.rs b/limitador/src/counter.rs index efd486e7..9763d627 100644 --- a/limitador/src/counter.rs +++ b/limitador/src/counter.rs @@ -40,6 +40,7 @@ impl Counter { } } + #[cfg(any(feature = "redis_storage", feature = "disk_storage"))] pub(crate) fn key(&self) -> Self { Self { limit: self.limit.clone(), diff --git a/limitador/src/storage/atomic_expiring_value.rs b/limitador/src/storage/atomic_expiring_value.rs index f8d19ee0..c661ca08 100644 --- a/limitador/src/storage/atomic_expiring_value.rs +++ b/limitador/src/storage/atomic_expiring_value.rs @@ -27,7 +27,7 @@ impl AtomicExpiringValue { self.value_at(SystemTime::now()) } - #[allow(dead_code)] + #[cfg(feature = "redis_storage")] pub fn add_and_set_expiry(&self, delta: u64, expiry: SystemTime) -> u64 { self.expiry.update(expiry); self.value.fetch_add(delta, Ordering::SeqCst) + delta @@ -59,11 +59,6 @@ impl AtomicExpiryTime { } } - #[allow(dead_code)] - pub fn from_now(ttl: Duration) -> Self { - Self::new(SystemTime::now() + ttl) - } - fn since_epoch(when: SystemTime) -> u64 { when.duration_since(UNIX_EPOCH) .expect("SystemTime before UNIX EPOCH!") @@ -83,7 +78,7 @@ impl AtomicExpiryTime { self.expiry.load(Ordering::SeqCst) <= when } - #[allow(dead_code)] + #[cfg(feature = "redis_storage")] pub fn update(&self, expiry: SystemTime) { self.expiry .store(Self::since_epoch(expiry), Ordering::SeqCst); diff --git a/limitador/src/storage/redis/counters_cache.rs b/limitador/src/storage/redis/counters_cache.rs index 94223cdc..35152537 100644 --- a/limitador/src/storage/redis/counters_cache.rs +++ b/limitador/src/storage/redis/counters_cache.rs @@ -42,6 +42,7 @@ impl CachedCounterValue { } } + #[cfg(feature = "redis_storage")] pub fn add_from_authority(&self, delta: u64, expire_at: SystemTime, max_value: u64) { let new_val = self.value.add_and_set_expiry(delta, expire_at); if new_val > max_value { diff --git a/limitador/tests/helpers/tests_limiter.rs b/limitador/tests/helpers/tests_limiter.rs index b7bc4ca9..2bae0c3e 100644 --- a/limitador/tests/helpers/tests_limiter.rs +++ b/limitador/tests/helpers/tests_limiter.rs @@ -10,7 +10,7 @@ use std::collections::{HashMap, HashSet}; enum LimiterImpl { Blocking(RateLimiter), - #[allow(dead_code)] // dead when no "redis_storage" + #[cfg(feature = "redis_storage")] Async(AsyncRateLimiter), } @@ -25,7 +25,7 @@ impl TestsLimiter { } } - #[allow(dead_code)] // dead when no "redis_storage" + #[cfg(feature = "redis_storage")] pub fn new_from_async_impl(limiter: AsyncRateLimiter) -> Self { Self { limiter_impl: LimiterImpl::Async(limiter), diff --git a/limitador/tests/integration_tests.rs b/limitador/tests/integration_tests.rs index 9a24663a..7e0e6595 100644 --- a/limitador/tests/integration_tests.rs +++ b/limitador/tests/integration_tests.rs @@ -21,6 +21,7 @@ macro_rules! test_with_all_storage_impls { $function(&mut TestsLimiter::new_from_blocking_impl(rate_limiter)).await; } + #[cfg(feature = "disk_storage")] #[tokio::test] async fn [<$function _disk_storage>]() { let dir = TempDir::new().expect("We should have a dir!"); @@ -159,6 +160,7 @@ mod test { use self::limitador::RateLimiter; use crate::helpers::tests_limiter::*; use limitador::limit::Limit; + #[cfg(feature = "disk_storage")] use limitador::storage::disk::{DiskStorage, OptimizeFor}; #[cfg(feature = "distributed_storage")] use limitador::storage::distributed::CrInMemoryStorage; @@ -167,6 +169,7 @@ mod test { use std::future::Future; use std::thread::sleep; use std::time::Duration; + #[cfg(feature = "disk_storage")] use tempfile::TempDir; use tokio::time::error::Elapsed; From 709fca1886dc74eb2c2b39b27246cb37859114cc Mon Sep 17 00:00:00 2001 From: Alex Snaps Date: Fri, 17 May 2024 12:45:09 -0400 Subject: [PATCH 4/4] Distributed storage to use the new sigs --- .../src/storage/distributed/cr_counter_value.rs | 13 +++++-------- limitador/src/storage/distributed/mod.rs | 17 ++++++++--------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/limitador/src/storage/distributed/cr_counter_value.rs b/limitador/src/storage/distributed/cr_counter_value.rs index eb6fc1fb..e76b4b05 100644 --- a/limitador/src/storage/distributed/cr_counter_value.rs +++ b/limitador/src/storage/distributed/cr_counter_value.rs @@ -20,7 +20,7 @@ impl CrCounterValue { ourselves: actor, value: Default::default(), others: RwLock::default(), - expiry: AtomicExpiryTime::from_now(time_window), + expiry: AtomicExpiryTime::new(SystemTime::now() + time_window), } } @@ -43,7 +43,7 @@ impl CrCounterValue { } pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) { - if self.expiry.update_if_expired(time_window.as_secs(), when) { + if self.expiry.update_if_expired(time_window, when) { self.value.store(increment, Ordering::SeqCst); } else { self.value.fetch_add(increment, Ordering::SeqCst); @@ -59,10 +59,7 @@ impl CrCounterValue { self.inc_at(increment, time_window, when); } else { let mut guard = self.others.write().unwrap(); - if self - .expiry - .update_if_expired(time_window.as_micros() as u64, when) - { + if self.expiry.update_if_expired(time_window, when) { guard.insert(actor, increment); } else { *guard.entry(actor).or_insert(0) += increment; @@ -109,7 +106,7 @@ impl CrCounterValue { } pub fn ttl(&self) -> Duration { - self.expiry.duration() + self.expiry.ttl() } pub fn expiry(&self) -> SystemTime { @@ -282,6 +279,6 @@ mod tests { a.inc(3, later); b.inc(2, later); a.merge(b); - assert!(a.expiry.duration() < sooner); + assert!(a.expiry.ttl() < sooner); } } diff --git a/limitador/src/storage/distributed/mod.rs b/limitador/src/storage/distributed/mod.rs index 2c319576..7a9e7761 100644 --- a/limitador/src/storage/distributed/mod.rs +++ b/limitador/src/storage/distributed/mod.rs @@ -72,7 +72,7 @@ impl CounterStorage for CrInMemoryStorage { None => self.qualified_counters.get_with(counter.clone(), || { Arc::new(CrCounterValue::new( self.identifier.clone(), - Duration::from_secs(counter.seconds()), + counter.window(), )) }), Some(counter) => counter, @@ -82,16 +82,16 @@ impl CounterStorage for CrInMemoryStorage { match limits_by_namespace.entry(counter.limit().namespace().clone()) { Entry::Vacant(v) => { let mut limits = HashMap::new(); - let duration = Duration::from_secs(counter.seconds()); - let counter_val = CrCounterValue::new(self.identifier.clone(), duration); + let counter_val = + CrCounterValue::new(self.identifier.clone(), counter.window()); self.increment_counter(counter.clone(), &counter_val, delta, now); limits.insert(counter.limit().clone(), counter_val); v.insert(limits); } Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) { Entry::Vacant(v) => { - let duration = Duration::from_secs(counter.seconds()); - let counter_value = CrCounterValue::new(self.identifier.clone(), duration); + let counter_value = + CrCounterValue::new(self.identifier.clone(), counter.window()); self.increment_counter(counter.clone(), &counter_value, delta, now); v.insert(counter_value); } @@ -158,7 +158,7 @@ impl CounterStorage for CrInMemoryStorage { None => self.qualified_counters.get_with(counter.clone(), || { Arc::new(CrCounterValue::new( self.identifier.clone(), - Duration::from_secs(counter.seconds()), + counter.window(), )) }), Some(counter) => counter, @@ -338,8 +338,7 @@ impl CrInMemoryStorage { delta: u64, when: SystemTime, ) { - counter.inc_at(delta, Duration::from_secs(key.seconds()), when); - + counter.inc_at(delta, key.window(), when); let counter = counter.clone(); let (expiry, values) = counter.into_inner(); let key: CounterKey = key.into(); @@ -366,7 +365,7 @@ impl From for CounterKey { fn from(value: Counter) -> Self { Self { namespace: value.namespace().clone(), - seconds: value.seconds(), + seconds: value.window().as_secs(), variables: value.limit().variables(), conditions: value.limit().conditions(), vars: value.set_variables().clone(),