diff --git a/Cargo.lock b/Cargo.lock index c0cb79b4..43770eae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2010,12 +2010,31 @@ dependencies = [ "winapi", ] +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -2659,9 +2678,9 @@ dependencies = [ [[package]] name = "redis" -version = "0.25.4" +version = "0.27.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0d7a6955c7511f60f3ba9e86c6d02b3c3f144f8c24b288d1f4e18074ab8bbec" +checksum = "92f61607c4c4442b575fbc3f31a5dd4e5dd69cfea8f6afec5b83e24f61c126ab" dependencies = [ "arc-swap", "async-trait", @@ -2671,6 +2690,7 @@ dependencies = [ "futures-util", "itoa", "native-tls", + "num-bigint", "percent-encoding", "pin-project-lite", "ryu", @@ -2678,16 +2698,16 @@ dependencies = [ "socket2", "tokio", "tokio-native-tls", - "tokio-retry", + "tokio-retry2", "tokio-util", "url", ] [[package]] name = "redis-test" -version = "0.4.0" +version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a948b3cec9e4b1fedbb0f0788e79029fb1f641b6cfefb7a15d044f803854427" +checksum = "45c7f734af2b89bb22c6e7ee45507adc698e127a90ae2f5c4054eeda54202912" dependencies = [ "futures", "redis", @@ -3305,10 +3325,10 @@ dependencies = [ ] [[package]] -name = "tokio-retry" -version = "0.3.0" +name = "tokio-retry2" +version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +checksum = "903934dba1c4c2f2e9cb460ef10b5695e0b0ecad3bf9ee7c8675e540c5e8b2d1" dependencies = [ "pin-project", "rand", diff --git a/limitador/Cargo.toml b/limitador/Cargo.toml index 6ef0f9f6..97ed5ff1 100644 --- a/limitador/Cargo.toml +++ b/limitador/Cargo.toml @@ -36,11 +36,12 @@ metrics = "0.22.3" # Optional dependencies rocksdb = { version = "0.22", optional = true, features = ["multi-threaded-cf"] } -redis = { version = "0.25", optional = true, features = [ +redis = { version = "0.27", optional = true, features = [ "connection-manager", "tokio-comp", "tls-native-tls", "tokio-native-tls-comp", + "script", ] } r2d2 = { version = "0.8", optional = true } tokio = { version = "1", optional = true, features = [ @@ -62,8 +63,8 @@ time = "0.3.36" [dev-dependencies] serial_test = "3.0" criterion = { version = "0.5.1", features = ["html_reports", "async_tokio"] } -redis-test = { version = "0.4.0", features = ["aio"] } -redis = { version = "0.25", features = [ +redis-test = { version = "0.6.0", features = ["aio"] } +redis = { version = "0.27", features = [ "connection-manager", "tokio-comp", "tls-native-tls", diff --git a/limitador/src/storage/redis/redis_async.rs b/limitador/src/storage/redis/redis_async.rs index 85563792..7b1be0a7 100644 --- a/limitador/src/storage/redis/redis_async.rs +++ b/limitador/src/storage/redis/redis_async.rs @@ -9,7 +9,7 @@ use crate::storage::redis::is_limited; use crate::storage::redis::scripts::{SCRIPT_UPDATE_COUNTER, VALUES_AND_TTLS}; use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; -use redis::{AsyncCommands, RedisError}; +use redis::{AsyncCommands, ErrorKind, RedisError}; use std::collections::HashSet; use std::ops::Deref; use std::str::FromStr; @@ -56,7 +56,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { .key(key_for_counters_of_limit(counter.limit())) .arg(counter.window().as_secs()) .arg(delta) - .invoke_async::<_, ()>(&mut con) + .invoke_async::<()>(&mut con) .instrument(info_span!("datastore")) .await?; @@ -112,17 +112,35 @@ impl AsyncCounterStorage for AsyncRedisStorage { } } - // TODO: this can be optimized by using pipelines with multiple updates - for (counter_idx, key) in counter_keys.into_iter().enumerate() { + let script = redis::Script::new(SCRIPT_UPDATE_COUNTER); + let mut pipeline = redis::pipe(); + let mut pipeline = &mut pipeline; + for (counter_idx, key) in counter_keys.iter().enumerate() { let counter = &counters[counter_idx]; - redis::Script::new(SCRIPT_UPDATE_COUNTER) - .key(key) - .key(key_for_counters_of_limit(counter.limit())) - .arg(counter.window().as_secs()) - .arg(delta) - .invoke_async::<_, _>(&mut con) - .instrument(info_span!("datastore")) - .await? + pipeline = pipeline + .invoke_script( + script + .key(key) + .key(key_for_counters_of_limit(counter.limit())) + .arg(counter.window().as_secs()) + .arg(delta), + ) + .ignore() + } + if let Err(err) = pipeline + .query_async::<()>(&mut con) + .instrument(info_span!("datastore")) + .await + { + if err.kind() == ErrorKind::NoScriptError { + script.prepare_invoke().load_async(&mut con).await?; + pipeline + .query_async::<()>(&mut con) + .instrument(info_span!("datastore")) + .await?; + } else { + Err(err)?; + } } Ok(Authorization::Ok) @@ -191,7 +209,7 @@ impl AsyncCounterStorage for AsyncRedisStorage { async fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_manager.clone(); redis::cmd("FLUSHDB") - .query_async::<_, ()>(&mut con) + .query_async::<()>(&mut con) .instrument(info_span!("datastore")) .await?; Ok(()) @@ -201,17 +219,23 @@ impl AsyncCounterStorage for AsyncRedisStorage { impl AsyncRedisStorage { pub async fn new(redis_url: &str) -> Result { let info = ConnectionInfo::from_str(redis_url)?; - Ok(Self { - conn_manager: ConnectionManager::new( + Self::new_with_conn_manager( + ConnectionManager::new( redis::Client::open(info) .expect("This couldn't fail in the past, yet now it did somehow!"), ) .await?, - }) + ) + .await } - pub fn new_with_conn_manager(conn_manager: ConnectionManager) -> Self { - Self { conn_manager } + pub async fn new_with_conn_manager( + conn_manager: ConnectionManager, + ) -> Result { + let store = Self { conn_manager }; + store.load_script(SCRIPT_UPDATE_COUNTER).await?; + store.load_script(VALUES_AND_TTLS).await?; + Ok(store) } async fn delete_counters_associated_with_limit(&self, limit: &Limit) -> Result<(), StorageErr> { @@ -233,6 +257,13 @@ impl AsyncRedisStorage { Ok(()) } + + pub(super) async fn load_script(&self, script: &str) -> Result<(), RedisError> { + let mut con = self.conn_manager.clone(); + let script = redis::Script::new(script); + script.prepare_invoke().load_async(&mut con).await?; + Ok(()) + } } #[cfg(test)] diff --git a/limitador/src/storage/redis/redis_cached.rs b/limitador/src/storage/redis/redis_cached.rs index 441d09da..fe47994f 100644 --- a/limitador/src/storage/redis/redis_cached.rs +++ b/limitador/src/storage/redis/redis_cached.rs @@ -13,7 +13,7 @@ use crate::storage::redis::{ use crate::storage::{AsyncCounterStorage, Authorization, StorageErr}; use async_trait::async_trait; use metrics::gauge; -use redis::aio::{ConnectionLike, ConnectionManager}; +use redis::aio::{ConnectionLike, ConnectionManager, ConnectionManagerConfig}; use redis::{ConnectionInfo, RedisError}; use std::collections::{HashMap, HashSet}; use std::str::FromStr; @@ -170,15 +170,13 @@ impl CachedRedisStorage { response_timeout: Duration, ) -> Result { let info = ConnectionInfo::from_str(redis_url)?; - let redis_conn_manager = ConnectionManager::new_with_backoff_and_timeouts( + let redis_conn_manager = ConnectionManager::new_with_config( redis::Client::open(info) .expect("This couldn't fail in the past, yet now it did somehow!"), - 2, - 100, - 1, - response_timeout, - // TLS handshake might result in an additional 2 RTTs to Redis, adding some headroom as well - (response_timeout * 3) + Duration::from_millis(50), + ConnectionManagerConfig::default() + .set_connection_timeout((response_timeout * 3) + Duration::from_millis(50)) + .set_response_timeout(response_timeout) + .set_number_of_retries(1), ) .await?; @@ -189,7 +187,7 @@ impl CachedRedisStorage { let counters_cache = Arc::new(cached_counters); let partitioned = Arc::new(AtomicBool::new(false)); let async_redis_storage = - AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()); + AsyncRedisStorage::new_with_conn_manager(redis_conn_manager.clone()).await?; { let counters_cache_clone = counters_cache.clone(); @@ -208,6 +206,10 @@ impl CachedRedisStorage { }); } + async_redis_storage + .load_script(BATCH_UPDATE_COUNTERS) + .await?; + Ok(Self { cached_counters: counters_cache, async_redis_storage, @@ -456,7 +458,7 @@ mod tests { counters_and_deltas.insert(counter.clone(), arc); let one_sec_from_now = SystemTime::now().add(Duration::from_secs(1)); - let mock_response = Value::Bulk(vec![ + let mock_response = Value::Array(vec![ Value::Int(NEW_VALUE_FROM_REDIS as i64), Value::Int( one_sec_from_now @@ -510,7 +512,7 @@ mod tests { Default::default(), ); - let mock_response = Value::Bulk(vec![ + let mock_response = Value::Array(vec![ Value::Int(8), Value::Int( SystemTime::now() diff --git a/limitador/src/storage/redis/redis_sync.rs b/limitador/src/storage/redis/redis_sync.rs index 08e5082b..f9b655fa 100644 --- a/limitador/src/storage/redis/redis_sync.rs +++ b/limitador/src/storage/redis/redis_sync.rs @@ -165,7 +165,7 @@ impl CounterStorage for RedisStorage { #[tracing::instrument(skip_all)] fn clear(&self) -> Result<(), StorageErr> { let mut con = self.conn_pool.get()?; - redis::cmd("FLUSHDB").execute(&mut *con); + redis::cmd("FLUSHDB").exec(&mut *con)?; Ok(()) } }