Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ttl expiry #334

Merged
merged 4 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion limitador-server/src/envoy_rls/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ pub fn to_response_header(
let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
Expand Down
2 changes: 1 addition & 1 deletion limitador-server/src/http_api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ pub fn add_response_header(
let mut all_limits_text = String::with_capacity(20 * counters.len());
counters.iter_mut().for_each(|counter| {
all_limits_text.push_str(
format!(", {};w={}", counter.max_value(), counter.seconds()).as_str(),
format!(", {};w={}", counter.max_value(), counter.window().as_secs()).as_str(),
);
if let Some(name) = counter.limit().name() {
all_limits_text
Expand Down
5 changes: 3 additions & 2 deletions limitador/src/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ impl Counter {
}
}

#[cfg(any(feature = "redis_storage", feature = "disk_storage"))]
pub(crate) fn key(&self) -> Self {
Self {
limit: self.limit.clone(),
Expand Down Expand Up @@ -68,8 +69,8 @@ impl Counter {
false
}

pub fn seconds(&self) -> u64 {
self.limit.seconds()
pub fn window(&self) -> Duration {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is great, it was always too ambiguous to refer to seconds

Duration::from_secs(self.limit.seconds())
}

pub fn namespace(&self) -> &Namespace {
Expand Down
35 changes: 17 additions & 18 deletions limitador/src/storage/atomic_expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ impl AtomicExpiringValue {
self.value_at(SystemTime::now())
}

#[allow(dead_code)]
pub fn add_and_set_expiry(&self, delta: u64, expire_at: SystemTime) -> u64 {
self.expiry.update(expire_at);
#[cfg(feature = "redis_storage")]
pub fn add_and_set_expiry(&self, delta: u64, expiry: SystemTime) -> u64 {
self.expiry.update(expiry);
self.value.fetch_add(delta, Ordering::SeqCst) + delta
}

pub fn update(&self, delta: u64, ttl: u64, when: SystemTime) -> u64 {
pub fn update(&self, delta: u64, ttl: Duration, when: SystemTime) -> u64 {
if self.expiry.update_if_expired(ttl, when) {
self.value.store(delta, Ordering::SeqCst);
return delta;
Expand All @@ -42,7 +42,7 @@ impl AtomicExpiringValue {
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
self.expiry.ttl()
}
}

Expand All @@ -59,18 +59,13 @@ impl AtomicExpiryTime {
}
}

#[allow(dead_code)]
pub fn from_now(ttl: Duration) -> Self {
Self::new(SystemTime::now() + ttl)
}

fn since_epoch(when: SystemTime) -> u64 {
when.duration_since(UNIX_EPOCH)
.expect("SystemTime before UNIX EPOCH!")
.as_micros() as u64
}

pub fn duration(&self) -> Duration {
pub fn ttl(&self) -> Duration {
let expiry =
SystemTime::UNIX_EPOCH + Duration::from_micros(self.expiry.load(Ordering::SeqCst));
expiry
Expand All @@ -83,14 +78,14 @@ impl AtomicExpiryTime {
self.expiry.load(Ordering::SeqCst) <= when
}

#[allow(dead_code)]
#[cfg(feature = "redis_storage")]
pub fn update(&self, expiry: SystemTime) {
self.expiry
.store(Self::since_epoch(expiry), Ordering::SeqCst);
}

pub fn update_if_expired(&self, ttl: u64, when: SystemTime) -> bool {
let ttl_micros = ttl * 1_000_000;
pub fn update_if_expired(&self, ttl: Duration, when: SystemTime) -> bool {
let ttl_micros = u64::try_from(ttl.as_micros()).expect("Wow! The future is here!");
let when_micros = Self::since_epoch(when);
let expiry = self.expiry.load(Ordering::SeqCst);
if expiry <= when_micros {
Expand Down Expand Up @@ -208,7 +203,7 @@ mod tests {
fn updates_when_valid() {
let now = SystemTime::now();
let val = AtomicExpiringValue::new(42, now + Duration::from_secs(1));
val.update(3, 10, now);
val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -217,7 +212,7 @@ mod tests {
let now = SystemTime::now();
let val = AtomicExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
val.update(3, 10, now);
val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

Expand All @@ -228,10 +223,14 @@ mod tests {

thread::scope(|s| {
s.spawn(|| {
atomic_expiring_value.update(1, 1, now);
atomic_expiring_value.update(1, Duration::from_secs(1), now);
});
s.spawn(|| {
atomic_expiring_value.update(2, 1, now + Duration::from_secs(11));
atomic_expiring_value.update(
2,
Duration::from_secs(1),
now + Duration::from_secs(11),
);
});
});
assert!([2u64, 3u64].contains(&atomic_expiring_value.value.load(Ordering::SeqCst)));
Expand Down
12 changes: 8 additions & 4 deletions limitador/src/storage/disk/expiring_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ impl ExpiringValue {
}

#[must_use]
pub fn update(self, delta: u64, ttl: u64, now: SystemTime) -> Self {
pub fn update(self, delta: u64, ttl: Duration, now: SystemTime) -> Self {
let expiry = if self.expiry <= now {
now + Duration::from_secs(ttl)
now + ttl
} else {
self.expiry
};
Expand Down Expand Up @@ -132,7 +132,11 @@ mod tests {
#[test]
fn updates_when_valid() {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(3, 10, now);
let val = ExpiringValue::new(42, now + Duration::from_secs(1)).update(
3,
Duration::from_secs(10),
now,
);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 45);
}

Expand All @@ -141,7 +145,7 @@ mod tests {
let now = SystemTime::now();
let val = ExpiringValue::new(42, now);
assert_eq!(val.ttl(), Duration::ZERO);
let val = val.update(3, 10, now);
let val = val.update(3, Duration::from_secs(10), now);
assert_eq!(val.value_at(now - Duration::from_secs(1)), 3);
}

Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/disk/rocksdb_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ impl RocksDbStorage {
let _entered = span.enter();
self.db
.merge(key, <ExpiringValue as Into<Vec<u8>>>::into(expiring_value))?;
return Ok(value.update(delta, counter.seconds(), now));
return Ok(value.update(delta, counter.window(), now));
}
Ok(value)
}
Expand Down
13 changes: 5 additions & 8 deletions limitador/src/storage/distributed/cr_counter_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl<A: Ord> CrCounterValue<A> {
ourselves: actor,
value: Default::default(),
others: RwLock::default(),
expiry: AtomicExpiryTime::from_now(time_window),
expiry: AtomicExpiryTime::new(SystemTime::now() + time_window),
}
}

Expand All @@ -43,7 +43,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn inc_at(&self, increment: u64, time_window: Duration, when: SystemTime) {
if self.expiry.update_if_expired(time_window.as_secs(), when) {
if self.expiry.update_if_expired(time_window, when) {
self.value.store(increment, Ordering::SeqCst);
} else {
self.value.fetch_add(increment, Ordering::SeqCst);
Expand All @@ -59,10 +59,7 @@ impl<A: Ord> CrCounterValue<A> {
self.inc_at(increment, time_window, when);
} else {
let mut guard = self.others.write().unwrap();
if self
.expiry
.update_if_expired(time_window.as_micros() as u64, when)
{
if self.expiry.update_if_expired(time_window, when) {
guard.insert(actor, increment);
} else {
*guard.entry(actor).or_insert(0) += increment;
Expand Down Expand Up @@ -109,7 +106,7 @@ impl<A: Ord> CrCounterValue<A> {
}

pub fn ttl(&self) -> Duration {
self.expiry.duration()
self.expiry.ttl()
}

pub fn expiry(&self) -> SystemTime {
Expand Down Expand Up @@ -282,6 +279,6 @@ mod tests {
a.inc(3, later);
b.inc(2, later);
a.merge(b);
assert!(a.expiry.duration() < sooner);
assert!(a.expiry.ttl() < sooner);
}
}
17 changes: 8 additions & 9 deletions limitador/src/storage/distributed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl CounterStorage for CrInMemoryStorage {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(CrCounterValue::new(
self.identifier.clone(),
Duration::from_secs(counter.seconds()),
counter.window(),
))
}),
Some(counter) => counter,
Expand All @@ -82,16 +82,16 @@ impl CounterStorage for CrInMemoryStorage {
match limits_by_namespace.entry(counter.limit().namespace().clone()) {
Entry::Vacant(v) => {
let mut limits = HashMap::new();
let duration = Duration::from_secs(counter.seconds());
let counter_val = CrCounterValue::new(self.identifier.clone(), duration);
let counter_val =
CrCounterValue::new(self.identifier.clone(), counter.window());
self.increment_counter(counter.clone(), &counter_val, delta, now);
limits.insert(counter.limit().clone(), counter_val);
v.insert(limits);
}
Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) {
Entry::Vacant(v) => {
let duration = Duration::from_secs(counter.seconds());
let counter_value = CrCounterValue::new(self.identifier.clone(), duration);
let counter_value =
CrCounterValue::new(self.identifier.clone(), counter.window());
self.increment_counter(counter.clone(), &counter_value, delta, now);
v.insert(counter_value);
}
Expand Down Expand Up @@ -158,7 +158,7 @@ impl CounterStorage for CrInMemoryStorage {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(CrCounterValue::new(
self.identifier.clone(),
Duration::from_secs(counter.seconds()),
counter.window(),
))
}),
Some(counter) => counter,
Expand Down Expand Up @@ -338,8 +338,7 @@ impl CrInMemoryStorage {
delta: u64,
when: SystemTime,
) {
counter.inc_at(delta, Duration::from_secs(key.seconds()), when);

counter.inc_at(delta, key.window(), when);
let counter = counter.clone();
let (expiry, values) = counter.into_inner();
let key: CounterKey = key.into();
Expand All @@ -366,7 +365,7 @@ impl From<Counter> for CounterKey {
fn from(value: Counter) -> Self {
Self {
namespace: value.namespace().clone(),
seconds: value.seconds(),
seconds: value.window().as_secs(),
variables: value.limit().variables(),
conditions: value.limit().conditions(),
vars: value.set_variables().clone(),
Expand Down
32 changes: 10 additions & 22 deletions limitador/src/storage/in_memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,36 +56,27 @@ impl CounterStorage for InMemoryStorage {
if counter.is_qualified() {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(AtomicExpiringValue::new(
0,
now + Duration::from_secs(counter.seconds()),
))
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};
value.update(delta, counter.seconds(), now);
value.update(delta, counter.window(), now);
} else {
match limits_by_namespace.entry(counter.limit().namespace().clone()) {
Entry::Vacant(v) => {
let mut limits = HashMap::new();
limits.insert(
counter.limit().clone(),
AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
),
AtomicExpiringValue::new(delta, now + counter.window()),
);
v.insert(limits);
}
Entry::Occupied(mut o) => match o.get_mut().entry(counter.limit().clone()) {
Entry::Vacant(v) => {
v.insert(AtomicExpiringValue::new(
delta,
now + Duration::from_secs(counter.seconds()),
));
v.insert(AtomicExpiringValue::new(delta, now + counter.window()));
}
Entry::Occupied(o) => {
o.get().update(delta, counter.seconds(), now);
o.get().update(delta, counter.window(), now);
}
},
}
Expand All @@ -102,8 +93,8 @@ impl CounterStorage for InMemoryStorage {
) -> Result<Authorization, StorageErr> {
let limits_by_namespace = self.limits_for_namespace.read().unwrap();
let mut first_limited = None;
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, u64)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, u64)> =
let mut counter_values_to_update: Vec<(&AtomicExpiringValue, Duration)> = Vec::new();
let mut qualified_counter_values_to_updated: Vec<(Arc<AtomicExpiringValue>, Duration)> =
Vec::new();
let now = SystemTime::now();

Expand Down Expand Up @@ -138,17 +129,14 @@ impl CounterStorage for InMemoryStorage {
return Ok(limited);
}
}
counter_values_to_update.push((atomic_expiring_value, counter.seconds()));
counter_values_to_update.push((atomic_expiring_value, counter.window()));
}

// Process qualified counters
for counter in counters.iter_mut().filter(|c| c.is_qualified()) {
let value = match self.qualified_counters.get(counter) {
None => self.qualified_counters.get_with(counter.clone(), || {
Arc::new(AtomicExpiringValue::new(
0,
now + Duration::from_secs(counter.seconds()),
))
Arc::new(AtomicExpiringValue::new(0, now + counter.window()))
}),
Some(counter) => counter,
};
Expand All @@ -159,7 +147,7 @@ impl CounterStorage for InMemoryStorage {
}
}

qualified_counter_values_to_updated.push((value, counter.seconds()));
qualified_counter_values_to_updated.push((value, counter.window()));
}

if let Some(limited) = first_limited {
Expand Down
2 changes: 1 addition & 1 deletion limitador/src/storage/keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub mod bin {

CounterKey {
ns: counter.namespace().as_ref(),
seconds: counter.seconds(),
seconds: counter.window().as_secs(),
conditions,
variables: counter.variables_for_key(),
}
Expand Down
Loading
Loading