From 95f3617b4189ee60ee201cc2219ebf1abf920002 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Fri, 4 Oct 2024 00:09:44 +0200 Subject: [PATCH 1/2] Add tests around new RetryOpts features As usual, caught a few things writing tests. Tests are awesome. --- Cargo.lock | 89 +++++++++++++++++- Cargo.toml | 1 + src/lib.rs | 76 +++++++++++++-- src/processor.rs | 2 +- tests/server_middleware_test.rs | 161 +++++++++++++++++++++++++++++++- 5 files changed, 315 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6e319c..acc91a6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -334,6 +334,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13e2792b0ff0340399d58445b88fd9770e3489eff258a4cbc1523418f12abf84" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.26" @@ -341,6 +356,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2e5317663a9089767a1ec00a487df42e0ca174b61b4483213ac24448e4664df5" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -349,6 +365,23 @@ version = "0.3.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec90ff4d0fe1f57d600049061dc6bb68ed03c7d2fbd697274c41805dcb3f8608" +[[package]] +name = "futures-executor" +version = "0.3.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8de0a35a6ab97ec8869e32a2473f4b1324459e14c29275d14b10cb1fd19b50e" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" + [[package]] name = "futures-sink" version = "0.3.26" @@ -369,8 +402,10 @@ checksum = "9c1d6de3acfef38d2be4b1f543f553131788603495be83da675e180c8d6b7bd1" dependencies = [ "futures-channel", "futures-core", + "futures-io", "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", "slab", @@ -643,9 +678,12 @@ checksum = "1a5b3dd1c072ee7963717671d1ca129f1048fda25edea6b752bfc71ac8854170" [[package]] name = "once_cell" -version = "1.17.0" +version = "1.20.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6f61fba1741ea2b3d6a1e3178721804bb716a68a6aeba1149b5d52e3d464ea66" +checksum = "82881c4be219ab5faaf2ad5e5e5ecdff8c66bd7402ca3160975c93b24961afd1" +dependencies = [ + "portable-atomic", +] [[package]] name = "overload" @@ -694,6 +732,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "portable-atomic" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc9c68a3f6da06753e9335d63e27f6b9754dd1920d941135b7ea8224f141adb2" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -861,6 +905,7 @@ dependencies = [ "redis", "serde", "serde_json", + "serial_test", "sha2", "simple-process-stats", "slog-term", @@ -877,6 +922,15 @@ version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b4b9743ed687d4b4bcedf9ff5eaa7398495ae14e61cba0a295704edbc7decde" +[[package]] +name = "scc" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "836f1e0f4963ef5288b539b643b35e043e76a32d0f4e47e67febf69576527f50" +dependencies = [ + "sdd", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -889,6 +943,12 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddccb15bcce173023b3fedd9436f882a0739b8dfb45e4f6b6002bee5929f61b2" +[[package]] +name = "sdd" +version = "3.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a7b59a5d9b0099720b417b6325d91a52cbf5b3dcb5041d864be53eefa58abc" + [[package]] name = "serde" version = "1.0.152" @@ -920,6 +980,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b4b487fe2acf240a021cf57c6b2b4903b1e78ca0ecd862a71b71d2a51fed77d" +dependencies = [ + "futures", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "82fe9db325bcef1fbcde82e078a5cc4efdf787e96b3b9cf45b50b529f2083d67" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "sha1_smol" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index 5c850f9..19b5f4e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,3 +42,4 @@ convert_case = "0.6.0" tracing = "0.1.40" tracing-subscriber = { version = "0.3.17", features = ["env-filter", "json"] } +serial_test = "3.1.1" diff --git a/src/lib.rs b/src/lib.rs index ead83de..65587fa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,10 @@ use async_trait::async_trait; use middleware::Chain; use rand::{Rng, RngCore}; -use serde::{Deserialize, Serialize}; +use serde::{ + de::{self, Deserializer, Visitor}, + Deserialize, Serialize, Serializer, +}; use serde_json::Value as JsonValue; use sha2::{Digest, Sha256}; use std::future::Future; @@ -96,7 +99,8 @@ impl EnqueueOpts { } } - fn create_job(&self, class: String, args: impl serde::Serialize) -> Result { + #[must_use] + pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result { let args = serde_json::to_value(args)?; // Ensure args are always wrapped in an array. @@ -233,7 +237,7 @@ where } #[allow(clippy::wrong_self_convention)] - fn into_opts(&self) -> EnqueueOpts { + pub fn into_opts(&self) -> EnqueueOpts { self.into() } @@ -403,16 +407,70 @@ impl WorkerRef { } } -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -#[serde(untagged)] +#[derive(Clone, Debug, PartialEq)] pub enum RetryOpts { - #[serde(rename = "true")] Yes, - #[serde(rename = "false")] Never, Max(usize), } +impl Serialize for RetryOpts { + fn serialize(&self, serializer: S) -> std::result::Result + where + S: Serializer, + { + match *self { + RetryOpts::Yes => serializer.serialize_bool(true), + RetryOpts::Never => serializer.serialize_bool(false), + RetryOpts::Max(value) => serializer.serialize_u64(value as u64), + } + } +} + +impl<'de> Deserialize<'de> for RetryOpts { + fn deserialize(deserializer: D) -> std::result::Result + where + D: Deserializer<'de>, + { + struct RetryOptsVisitor; + + impl<'de> Visitor<'de> for RetryOptsVisitor { + type Value = RetryOpts; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a boolean, null, or a positive integer") + } + + fn visit_bool(self, value: bool) -> std::result::Result + where + E: de::Error, + { + if value { + Ok(RetryOpts::Yes) + } else { + Ok(RetryOpts::Never) + } + } + + fn visit_none(self) -> std::result::Result + where + E: de::Error, + { + Ok(RetryOpts::Never) + } + + fn visit_u64(self, value: u64) -> std::result::Result + where + E: de::Error, + { + Ok(RetryOpts::Max(value as usize)) + } + } + + deserializer.deserialize_any(RetryOptsVisitor) + } +} + impl From for RetryOpts { fn from(value: bool) -> Self { match value { @@ -463,8 +521,8 @@ pub struct Job { #[derive(Debug)] pub struct UnitOfWork { - queue: String, - job: Job, + pub queue: String, + pub job: Job, } impl UnitOfWork { diff --git a/src/processor.rs b/src/processor.rs index 85f3bf8..f0149a8 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -89,7 +89,7 @@ impl Processor { self } - async fn fetch(&mut self) -> Result> { + pub async fn fetch(&mut self) -> Result> { let response: Option<(String, String)> = self .redis .get() diff --git a/tests/server_middleware_test.rs b/tests/server_middleware_test.rs index 2948a8f..bef7f54 100644 --- a/tests/server_middleware_test.rs +++ b/tests/server_middleware_test.rs @@ -2,9 +2,10 @@ mod test { use async_trait::async_trait; use bb8::Pool; + use serial_test::serial; use sidekiq::{ - ChainIter, Job, Processor, RedisConnectionManager, RedisPool, Result, ServerMiddleware, - WorkFetcher, Worker, WorkerRef, + ChainIter, Error, Job, Processor, RedisConnectionManager, RedisPool, Result, RetryOpts, + Scheduled, ServerMiddleware, UnitOfWork, WorkFetcher, Worker, WorkerRef, }; use std::sync::{Arc, Mutex}; @@ -18,6 +19,7 @@ mod test { async fn flushall(&self) { let mut conn = self.get().await.unwrap(); let _: String = redis::cmd("FLUSHALL") + .arg("SYNC") .query_async(conn.unnamespaced_borrow_mut()) .await .unwrap(); @@ -36,6 +38,16 @@ mod test { (p, redis) } + #[derive(Clone)] + struct AlwaysFailWorker; + + #[async_trait] + impl Worker<()> for AlwaysFailWorker { + async fn perform(&self, _args: ()) -> Result<()> { + Err(Error::Message("big ouchie".to_string())) + } + } + #[derive(Clone)] struct TestWorker { did_process: Arc>, @@ -80,6 +92,7 @@ mod test { } #[tokio::test] + #[serial] async fn can_process_job_with_middleware() { let worker = TestWorker { did_process: Arc::new(Mutex::new(false)), @@ -107,6 +120,7 @@ mod test { } #[tokio::test] + #[serial] async fn can_prevent_job_from_being_processed_with_halting_middleware() { let worker = TestWorker { did_process: Arc::new(Mutex::new(false)), @@ -132,4 +146,147 @@ mod test { assert!(!*worker.did_process.lock().unwrap()); assert!(*middleware.did_process.lock().unwrap()); } + + #[tokio::test] + #[serial] + async fn can_retry_a_job() { + let worker = AlwaysFailWorker; + let queue = "failure_zone".to_string(); + let (mut p, redis) = new_base_processor(queue.clone()).await; + p.register(worker.clone()); + + AlwaysFailWorker::opts() + .queue(queue) + .retry(true) + .perform_async(&redis, ()) + .await + .unwrap(); + + assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done); + let sets = vec!["retry".to_string()]; + let sched = Scheduled::new(redis.clone()); + let future_date = chrono::Utc::now() + chrono::Duration::days(30); + + // We should be able to reenqueue the job. + let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await; + assert!(n_jobs_retried.is_ok()); + let n_jobs_retried = n_jobs_retried.unwrap(); + assert_eq!(n_jobs_retried, 1, "one job in the retry queue"); + + // Let's grab that job. + let job = p.fetch().await; + assert!(job.is_ok()); + let job = job.unwrap(); + assert!(job.is_some()); + let job = job.unwrap(); + + assert_eq!(job.job.retry, RetryOpts::Yes); + assert_eq!(job.job.retry_count, Some(1)); + assert_eq!(job.job.class, "AlwaysFailWorker"); + } + + #[tokio::test] + #[serial] + async fn can_retry_only_until_the_max_global_retries() { + let worker = AlwaysFailWorker; + let queue = "failure_zone_global".to_string(); + let (mut p, redis) = new_base_processor(queue.clone()).await; + p.register(worker.clone()); + + let mut job = AlwaysFailWorker::opts() + .queue(queue) + .retry(true) + .into_opts() + .create_job(AlwaysFailWorker::class_name(), ()) + .expect("never fails"); + + // One last retry remaining. + assert_eq!(worker.max_retries(), 25, "default is 25 retries"); + job.retry_count = Some(worker.max_retries()); + + UnitOfWork::from_job(job) + .enqueue(&redis) + .await + .expect("enqueues"); + + assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done); + let sets = vec!["retry".to_string()]; + let sched = Scheduled::new(redis.clone()); + let future_date = chrono::Utc::now() + chrono::Duration::days(30); + + // We should have no jobs that need retrying. + let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await; + assert!(n_jobs_retried.is_ok()); + let n_jobs_retried = n_jobs_retried.unwrap(); + + assert_eq!(n_jobs_retried, 0, "no jobs in the retry queue"); + } + + #[tokio::test] + #[serial] + async fn can_retry_based_on_job_opts_retries() { + let worker = AlwaysFailWorker; + let queue = "failure_zone_max_on_job".to_string(); + let (mut p, redis) = new_base_processor(queue.clone()).await; + p.register(worker.clone()); + + let mut job = AlwaysFailWorker::opts() + .queue(queue) + .retry(5) + .into_opts() + .create_job(AlwaysFailWorker::class_name(), ()) + .expect("never fails"); + + // One last retry remaining from the retry(5) on the job params. + assert_eq!(worker.max_retries(), 25, "default is 25 retries"); + job.retry_count = Some(5); + + UnitOfWork::from_job(job) + .enqueue(&redis) + .await + .expect("enqueues"); + + assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done); + let sets = vec!["retry".to_string()]; + let sched = Scheduled::new(redis.clone()); + let future_date = chrono::Utc::now() + chrono::Duration::days(30); + + // We should have no jobs that need retrying. + let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await; + assert!(n_jobs_retried.is_ok()); + let n_jobs_retried = n_jobs_retried.unwrap(); + + assert_eq!(n_jobs_retried, 0, "no jobs in the retry queue"); + } + + #[tokio::test] + #[serial] + async fn can_set_retry_to_false_per_job() { + let worker = AlwaysFailWorker; + let queue = "failure_zone_never_retry_the_job".to_string(); + let (mut p, redis) = new_base_processor(queue.clone()).await; + p.register(worker.clone()); + + AlwaysFailWorker::opts() + .queue(queue) + .retry(false) + .perform_async(&redis, ()) + .await + .expect("never fails"); + + // One last retry remaining from the retry(5) on the job params. + assert_eq!(worker.max_retries(), 25, "default is 25 retries"); + + assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done); + let sets = vec!["retry".to_string()]; + let sched = Scheduled::new(redis.clone()); + let future_date = chrono::Utc::now() + chrono::Duration::days(30); + + // We should have no jobs that need retrying. + let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await; + assert!(n_jobs_retried.is_ok()); + let n_jobs_retried = n_jobs_retried.unwrap(); + + assert_eq!(n_jobs_retried, 0, "no jobs in the retry queue"); + } } From 0782e18b76701468c472e7a102bfc3f4fc12ae64 Mon Sep 17 00:00:00 2001 From: Garrett Thornburg Date: Fri, 4 Oct 2024 00:11:50 +0200 Subject: [PATCH 2/2] Make clippy happy --- src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 65587fa..3a80bae 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -99,7 +99,6 @@ impl EnqueueOpts { } } - #[must_use] pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result { let args = serde_json::to_value(args)?;