diff --git a/src/lib.rs b/src/lib.rs index 8a94598..ead83de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -57,14 +57,14 @@ pub type Result = std::result::Result; pub fn opts() -> EnqueueOpts { EnqueueOpts { queue: "default".into(), - retry: true, + retry: RetryOpts::Yes, unique_for: None, } } pub struct EnqueueOpts { queue: String, - retry: bool, + retry: RetryOpts, unique_for: Option, } @@ -78,8 +78,14 @@ impl EnqueueOpts { } #[must_use] - pub fn retry(self, retry: bool) -> Self { - Self { retry, ..self } + pub fn retry(self, retry: RO) -> Self + where + RO: Into, + { + Self { + retry: retry.into(), + ..self + } } #[must_use] @@ -106,7 +112,7 @@ impl EnqueueOpts { jid: new_jid(), created_at: chrono::Utc::now().timestamp() as f64, enqueued_at: None, - retry: self.retry, + retry: self.retry.clone(), args, // Make default eventually... @@ -178,7 +184,7 @@ fn new_jid() -> String { pub struct WorkerOpts + ?Sized> { queue: String, - retry: bool, + retry: RetryOpts, args: PhantomData, worker: PhantomData, unique_for: Option, @@ -192,7 +198,7 @@ where pub fn new() -> Self { Self { queue: "default".into(), - retry: true, + retry: RetryOpts::Yes, args: PhantomData, worker: PhantomData, unique_for: None, @@ -200,8 +206,14 @@ where } #[must_use] - pub fn retry(self, retry: bool) -> Self { - Self { retry, ..self } + pub fn retry(self, retry: RO) -> Self + where + RO: Into, + { + Self { + retry: retry.into(), + ..self + } } #[must_use] @@ -250,7 +262,7 @@ where impl> From<&WorkerOpts> for EnqueueOpts { fn from(opts: &WorkerOpts) -> Self { Self { - retry: opts.retry, + retry: opts.retry.clone(), queue: opts.queue.clone(), unique_for: opts.unique_for, } @@ -391,6 +403,31 @@ impl WorkerRef { } } +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +#[serde(untagged)] +pub enum RetryOpts { + #[serde(rename = "true")] + Yes, + #[serde(rename = "false")] + Never, + Max(usize), +} + +impl From for RetryOpts { + fn from(value: bool) -> Self { + match value { + true => RetryOpts::Yes, + false => RetryOpts::Never, + } + } +} + +impl From for RetryOpts { + fn from(value: usize) -> Self { + RetryOpts::Max(value) + } +} + // // { // "retry": true, @@ -410,7 +447,7 @@ impl WorkerRef { pub struct Job { pub queue: String, pub args: JsonValue, - pub retry: bool, + pub retry: RetryOpts, pub class: String, pub jid: String, pub created_at: f64, @@ -533,6 +570,30 @@ mod test { pub mod workers { use super::super::super::super::*; + pub struct TestOpts; + + #[async_trait] + impl Worker<()> for TestOpts { + fn opts() -> WorkerOpts<(), Self> + where + Self: Sized, + { + WorkerOpts::new() + // Test bool + .retry(false) + // Test usize + .retry(42) + // Test the new type + .retry(RetryOpts::Never) + .unique_for(std::time::Duration::from_secs(30)) + .queue("yolo_quue") + } + + async fn perform(&self, _args: ()) -> Result<()> { + Ok(()) + } + } + pub struct X1Y2MyJob; #[async_trait] diff --git a/src/middleware.rs b/src/middleware.rs index 4762efe..c4bf702 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,5 +1,5 @@ use super::Result; -use crate::{Counter, Job, RedisPool, UnitOfWork, WorkerRef}; +use crate::{Counter, Job, RedisPool, RetryOpts, UnitOfWork, WorkerRef}; use async_trait::async_trait; use std::sync::Arc; use tokio::sync::RwLock; @@ -158,7 +158,13 @@ impl ServerMiddleware for RetryMiddleware { worker: Arc, redis: RedisPool, ) -> Result<()> { - let max_retries = worker.max_retries(); + // Check the job for a max retries N in the retry field and then fall + // back to the worker default max retries. + let max_retries = if let RetryOpts::Max(max_retries) = job.retry { + max_retries + } else { + worker.max_retries() + }; let err = { match chain.next(job, worker, redis.clone()).await { @@ -180,7 +186,7 @@ impl ServerMiddleware for RetryMiddleware { job.retry_count = Some(retry_count); // Attempt the retry. - if retry_count > max_retries { + if retry_count > max_retries || job.retry == RetryOpts::Never { error!({ "status" = "fail", "class" = &job.class, @@ -207,7 +213,7 @@ impl ServerMiddleware for RetryMiddleware { #[cfg(test)] mod test { use super::*; - use crate::{RedisConnectionManager, RedisPool, Worker}; + use crate::{RedisConnectionManager, RedisPool, RetryOpts, Worker}; use bb8::Pool; use tokio::sync::Mutex; @@ -221,7 +227,7 @@ mod test { class: "TestWorker".into(), queue: "default".into(), args: vec![1337].into(), - retry: true, + retry: RetryOpts::Yes, jid: crate::new_jid(), created_at: 1337.0, enqueued_at: None, diff --git a/src/periodic.rs b/src/periodic.rs index 2d46398..c4af1d5 100644 --- a/src/periodic.rs +++ b/src/periodic.rs @@ -1,5 +1,5 @@ use super::Result; -use crate::{new_jid, Error, Job, Processor, RedisConnection, RedisPool, Worker}; +use crate::{new_jid, Error, Job, Processor, RedisConnection, RedisPool, RetryOpts, Worker}; pub use cron_clock::{Schedule as Cron, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; @@ -19,7 +19,7 @@ pub struct Builder { pub(crate) name: Option, pub(crate) queue: Option, pub(crate) args: Option, - pub(crate) retry: Option, + pub(crate) retry: Option, pub(crate) cron: Cron, } @@ -40,6 +40,18 @@ impl Builder { ..self } } + + #[must_use] + pub fn retry(self, retry: RO) -> Builder + where + RO: Into, + { + Self { + retry: Some(retry.into()), + ..self + } + } + pub fn queue>(self, queue: S) -> Builder { Builder { queue: Some(queue.into()), @@ -65,14 +77,6 @@ impl Builder { }) } - #[must_use] - pub fn retry(self, retry: bool) -> Self { - Self { - retry: Some(retry), - ..self - } - } - pub async fn register(self, processor: &mut Processor, worker: W) -> Result<()> where Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static, @@ -100,7 +104,7 @@ impl Builder { ..Default::default() }; - pj.retry = self.retry; + pj.retry = self.retry.clone(); pj.queue = self.queue.clone(); pj.args = self.args.clone().map(|a| a.to_string()); @@ -117,7 +121,7 @@ pub struct PeriodicJob { pub(crate) cron: String, pub(crate) queue: Option, pub(crate) args: Option, - retry: Option, + retry: Option, #[serde(skip)] cron_schedule: Option, @@ -191,7 +195,7 @@ impl PeriodicJob { jid: new_jid(), created_at: chrono::Utc::now().timestamp() as f64, enqueued_at: None, - retry: self.retry.unwrap_or(false), + retry: self.retry.clone().unwrap_or(RetryOpts::Never), args, // Make default eventually...