Skip to content

Commit

Permalink
Expiries as SystemTimes
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 21, 2024
1 parent c7ee368 commit ebd9806
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 51 deletions.
4 changes: 2 additions & 2 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ impl AtomicExpiringValue {
}

#[allow(dead_code)]
pub fn add_and_set_expiry(&self, delta: u64, expire_at: SystemTime) -> u64 {
self.expiry.update(expire_at);
pub fn add_and_set_expiry(&self, delta: u64, expiry: SystemTime) -> u64 {
self.expiry.update(expiry);
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

Expand Down
56 changes: 18 additions & 38 deletions limitador/src/storage/redis/counters_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -254,32 +254,25 @@ impl CountersCache {
counter: Counter,
redis_val: u64,
remote_deltas: u64,
redis_expiry: i64,
expiry: SystemTime,
) -> Arc<CachedCounterValue> {
if redis_expiry > 0 {
let expiry_ts = SystemTime::UNIX_EPOCH + Duration::from_millis(redis_expiry as u64);
if expiry_ts > SystemTime::now() {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
if expiry > SystemTime::now() {
let mut from_cache = true;
let cached = self.cache.get_with(counter.clone(), || {
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
gauge!("cache_size").increment(1);
from_cache = false;
if let Some(entry) = self.batcher.updates.get(&counter) {
let cached_value = entry.value();
cached_value.add_from_authority(
remote_deltas,
expiry_ts,
counter.max_value(),
);
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(&counter, redis_val))
}
});
if from_cache {
cached.add_from_authority(remote_deltas, expiry_ts, counter.max_value());
let cached_value = entry.value();
cached_value.add_from_authority(remote_deltas, expiry, counter.max_value());
cached_value.clone()
} else {
Arc::new(CachedCounterValue::from_authority(&counter, redis_val))
}
return cached;
});
if from_cache {
cached.add_from_authority(remote_deltas, expiry, counter.max_value());
}
return cached;
}
Arc::new(CachedCounterValue::load_from_authority_asap(
&counter, redis_val,
Expand Down Expand Up @@ -342,7 +335,6 @@ impl CountersCacheBuilder {
mod tests {
use std::collections::HashMap;
use std::ops::Add;
use std::time::UNIX_EPOCH;

use crate::limit::Limit;

Expand Down Expand Up @@ -565,11 +557,7 @@ mod tests {
counter.clone(),
10,
0,
SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as i64,
SystemTime::now().add(Duration::from_secs(1)),
);

assert!(cache.get(&counter).is_some());
Expand All @@ -595,11 +583,7 @@ mod tests {
counter.clone(),
current_value,
0,
SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as i64,
SystemTime::now().add(Duration::from_secs(1)),
);

assert_eq!(
Expand All @@ -619,11 +603,7 @@ mod tests {
counter.clone(),
current_val,
0,
SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap()
.as_micros() as i64,
SystemTime::now().add(Duration::from_secs(1)),
);
cache.increase_by(&counter, increase_by).await;

Expand Down
32 changes: 21 additions & 11 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tracing::{debug_span, error, warn, Instrument};

// This is just a first version.
Expand Down Expand Up @@ -284,14 +284,15 @@ impl CachedRedisStorageBuilder {
async fn update_counters<C: ConnectionLike>(
redis_conn: &mut C,
counters_and_deltas: HashMap<Counter, Arc<CachedCounterValue>>,
) -> Result<Vec<(Counter, u64, u64, i64)>, StorageErr> {
) -> Result<Vec<(Counter, u64, u64, SystemTime)>, StorageErr> {
let redis_script = redis::Script::new(BATCH_UPDATE_COUNTERS);
let mut script_invocation = redis_script.prepare_invoke();

let res = if counters_and_deltas.is_empty() {
Default::default()
} else {
let mut res: Vec<(Counter, u64, u64, i64)> = Vec::with_capacity(counters_and_deltas.len());
let mut res: Vec<(Counter, u64, u64, SystemTime)> =
Vec::with_capacity(counters_and_deltas.len());

for (counter, value) in counters_and_deltas {
let (delta, last_value_from_redis) = value
Expand All @@ -303,7 +304,7 @@ async fn update_counters<C: ConnectionLike>(
script_invocation.arg(counter.window().as_secs());
script_invocation.arg(delta);
// We need to store the counter in the actual order we are sending it to the script
res.push((counter, 0, last_value_from_redis, 0));
res.push((counter, 0, last_value_from_redis, UNIX_EPOCH));
}
}

Expand All @@ -323,7 +324,8 @@ async fn update_counters<C: ConnectionLike>(
*delta = u64::try_from(script_res[j])
.unwrap_or(0)
.saturating_sub(*delta);
*expires_at = script_res[j + 1];
*expires_at =
UNIX_EPOCH + Duration::from_millis(u64::try_from(script_res[j + 1]).unwrap_or(0));
}
res
};
Expand Down Expand Up @@ -424,13 +426,15 @@ mod tests {
arc.delta(&counter, LOCAL_INCREMENTS);
counters_and_deltas.insert(counter.clone(), arc);

let one_sec_from_now = SystemTime::now()
.add(Duration::from_secs(1))
.duration_since(UNIX_EPOCH)
.unwrap();
let one_sec_from_now = SystemTime::now().add(Duration::from_secs(1));
let mock_response = Value::Bulk(vec![
Value::Int(NEW_VALUE_FROM_REDIS as i64),
Value::Int(one_sec_from_now.as_millis() as i64),
Value::Int(
one_sec_from_now
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
),
]);

let mut mock_client = MockRedisConnection::new(vec![MockCmd::new(
Expand All @@ -455,7 +459,13 @@ mod tests {
NEW_VALUE_FROM_REDIS - INITIAL_VALUE_FROM_REDIS - LOCAL_INCREMENTS,
remote_increments
);
assert_eq!(one_sec_from_now.as_millis(), expire_at as u128);
assert_eq!(
one_sec_from_now
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis(),
expire_at.duration_since(UNIX_EPOCH).unwrap().as_millis()
);
}

#[tokio::test]
Expand Down

0 comments on commit ebd9806

Please sign in to comment.