Skip to content

Commit

Permalink
Make batch size configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
alexsnaps committed May 10, 2024
1 parent 00bc5c1 commit f085f17
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 6 deletions.
3 changes: 3 additions & 0 deletions limitador-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ pub mod env {
value_for("REDIS_LOCAL_CACHE_MAX_TTL_CACHED_COUNTERS_MS");
pub static ref REDIS_LOCAL_CACHE_FLUSHING_PERIOD_MS: Option<&'static str> =
value_for("REDIS_LOCAL_CACHE_FLUSHING_PERIOD_MS");
pub static ref REDIS_LOCAL_CACHE_BATCH_SIZE: Option<&'static str> =
value_for("REDIS_LOCAL_CACHE_BATCH_SIZE");
pub static ref REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS: Option<&'static str> =
value_for("REDIS_LOCAL_CACHE_TTL_RATIO_CACHED_COUNTERS");
pub static ref RATE_LIMIT_HEADERS: Option<&'static str> = value_for("RATE_LIMIT_HEADERS");
Expand Down Expand Up @@ -162,6 +164,7 @@ pub struct RedisStorageConfiguration {

#[derive(PartialEq, Eq, Debug)]
pub struct RedisStorageCacheConfiguration {
pub batch_size: usize,
pub flushing_period: i64,
pub max_counters: usize,
pub response_timeout: u64,
Expand Down
22 changes: 20 additions & 2 deletions limitador-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use limitador::storage::disk::DiskStorage;
#[cfg(feature = "infinispan")]
use limitador::storage::infinispan::{Consistency, InfinispanStorageBuilder};
use limitador::storage::redis::{
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_FLUSHING_PERIOD_SEC,
DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
AsyncRedisStorage, CachedRedisStorage, CachedRedisStorageBuilder, DEFAULT_BATCH_SIZE,
DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
};
use limitador::storage::{AsyncCounterStorage, AsyncStorage, Storage};
use limitador::{
Expand Down Expand Up @@ -132,6 +132,7 @@ impl Limiter {
// TODO: Not all the options are configurable via ENV. Add them as needed.

let cached_redis_storage = CachedRedisStorageBuilder::new(redis_url)
.batch_size(cache_cfg.batch_size)
.flushing_period(Duration::from_millis(cache_cfg.flushing_period as u64))
.max_cached_counters(cache_cfg.max_counters)
.response_timeout(Duration::from_millis(cache_cfg.response_timeout));
Expand Down Expand Up @@ -588,6 +589,18 @@ fn create_config() -> (Configuration, &'static str) {
.about("Uses Redis to store counters, with an in-memory cache")
.display_order(4)
.arg(redis_url_arg)
.arg(
Arg::new("batch")
.long("batch-size")
.action(ArgAction::Set)
.value_parser(clap::value_parser!(u64))
.default_value(
config::env::REDIS_LOCAL_CACHE_BATCH_SIZE
.unwrap_or(leak(DEFAULT_BATCH_SIZE)),
)
.display_order(3)
.help("Size of entries to flush in as single flush"),
)
.arg(
Arg::new("flush")
.long("flush-period")
Expand Down Expand Up @@ -720,6 +733,7 @@ fn create_config() -> (Configuration, &'static str) {
Some(("redis_cached", sub)) => StorageConfiguration::Redis(RedisStorageConfiguration {
url: sub.get_one::<String>("URL").unwrap().to_owned(),
cache: Some(RedisStorageCacheConfiguration {
batch_size: *sub.get_one("batch").unwrap(),
flushing_period: *sub.get_one("flush").unwrap(),
max_counters: *sub.get_one("max").unwrap(),
response_timeout: *sub.get_one("timeout").unwrap(),
Expand Down Expand Up @@ -799,6 +813,10 @@ fn storage_config_from_env() -> Result<StorageConfiguration, ()> {
url,
cache: if env_option_is_enabled("REDIS_LOCAL_CACHE_ENABLED") {
Some(RedisStorageCacheConfiguration {
batch_size: env::var("REDIS_LOCAL_CACHE_BATCH_SIZE")
.unwrap_or_else(|_| (DEFAULT_BATCH_SIZE).to_string())
.parse()
.expect("Expected an usize"),
flushing_period: env::var("REDIS_LOCAL_CACHE_FLUSHING_PERIOD_MS")
.unwrap_or_else(|_| (DEFAULT_FLUSHING_PERIOD_SEC * 1000).to_string())
.parse()
Expand Down
1 change: 1 addition & 0 deletions limitador/src/storage/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod redis_sync;
mod scripts;

pub const DEFAULT_FLUSHING_PERIOD_SEC: u64 = 1;
pub const DEFAULT_BATCH_SIZE: usize = 100;
pub const DEFAULT_MAX_CACHED_COUNTERS: usize = 10000;
pub const DEFAULT_RESPONSE_TIMEOUT_MS: u64 = 350;

Expand Down
29 changes: 25 additions & 4 deletions limitador/src/storage/redis/redis_cached.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::storage::redis::counters_cache::{
use crate::storage::redis::redis_async::AsyncRedisStorage;
use crate::storage::redis::scripts::BATCH_UPDATE_COUNTERS;
use crate::storage::redis::{
DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS, DEFAULT_RESPONSE_TIMEOUT_MS,
DEFAULT_BATCH_SIZE, DEFAULT_FLUSHING_PERIOD_SEC, DEFAULT_MAX_CACHED_COUNTERS,
DEFAULT_RESPONSE_TIMEOUT_MS,
};
use crate::storage::{AsyncCounterStorage, Authorization, StorageErr};
use async_trait::async_trait;
Expand Down Expand Up @@ -146,6 +147,7 @@ impl CachedRedisStorage {
pub async fn new(redis_url: &str) -> Result<Self, RedisError> {
Self::new_with_options(
redis_url,
DEFAULT_BATCH_SIZE,
Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC),
DEFAULT_MAX_CACHED_COUNTERS,
Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS),
Expand All @@ -155,6 +157,7 @@ impl CachedRedisStorage {

async fn new_with_options(
redis_url: &str,
batch_size: usize,
flushing_period: Duration,
max_cached_counters: usize,
response_timeout: Duration,
Expand Down Expand Up @@ -193,6 +196,7 @@ impl CachedRedisStorage {
storage.is_alive().await,
counters_cache_clone.clone(),
p.clone(),
batch_size,
)
.await;
}
Expand Down Expand Up @@ -224,6 +228,7 @@ fn flip_partitioned(storage: &AtomicBool, partition: bool) -> bool {

pub struct CachedRedisStorageBuilder {
redis_url: String,
batch_size: usize,
flushing_period: Duration,
max_cached_counters: usize,
response_timeout: Duration,
Expand All @@ -233,12 +238,18 @@ impl CachedRedisStorageBuilder {
pub fn new(redis_url: &str) -> Self {
Self {
redis_url: redis_url.to_string(),
batch_size: DEFAULT_BATCH_SIZE,
flushing_period: Duration::from_secs(DEFAULT_FLUSHING_PERIOD_SEC),
max_cached_counters: DEFAULT_MAX_CACHED_COUNTERS,
response_timeout: Duration::from_millis(DEFAULT_RESPONSE_TIMEOUT_MS),
}
}

pub fn batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}

pub fn flushing_period(mut self, flushing_period: Duration) -> Self {
self.flushing_period = flushing_period;
self
Expand All @@ -257,6 +268,7 @@ impl CachedRedisStorageBuilder {
pub async fn build(self) -> Result<CachedRedisStorage, RedisError> {
CachedRedisStorage::new_with_options(
&self.redis_url,
self.batch_size,
self.flushing_period,
self.max_cached_counters,
self.response_timeout,
Expand Down Expand Up @@ -319,6 +331,7 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
storage_is_alive: bool,
cached_counters: Arc<CountersCache>,
partitioned: Arc<AtomicBool>,
batch_size: usize,
) {
if partitioned.load(Ordering::Acquire) || !storage_is_alive {
if !cached_counters.batcher().is_empty() {
Expand All @@ -327,7 +340,9 @@ async fn flush_batcher_and_update_counters<C: ConnectionLike>(
} else {
let updated_counters = cached_counters
.batcher()
.consume(100, |counters| update_counters(&mut redis_conn, counters))
.consume(batch_size, |counters| {
update_counters(&mut redis_conn, counters)
})
.await
.or_else(|err| {
if err.is_transient() {
Expand Down Expand Up @@ -486,8 +501,14 @@ mod tests {
assert_eq!(c.hits(&counter), 2);
}

flush_batcher_and_update_counters(mock_client, true, cached_counters.clone(), partitioned)
.await;
flush_batcher_and_update_counters(
mock_client,
true,
cached_counters.clone(),
partitioned,
100,
)
.await;

if let Some(c) = cached_counters.get(&counter) {
assert_eq!(c.hits(&counter), 8);
Expand Down

0 comments on commit f085f17

Please sign in to comment.