Skip to content

Commit

Permalink
Merge pull request #49 from fetlife/fix-type-of-retry-option
Browse files Browse the repository at this point in the history
Fix type of retry option: `retry` can be boolean or a number
  • Loading branch information
film42 authored Oct 2, 2024
2 parents c1822b7 + 291b99d commit 40232a4
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 29 deletions.
83 changes: 72 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ pub type Result<T> = std::result::Result<T, Error>;
pub fn opts() -> EnqueueOpts {
EnqueueOpts {
queue: "default".into(),
retry: true,
retry: RetryOpts::Yes,
unique_for: None,
}
}

pub struct EnqueueOpts {
queue: String,
retry: bool,
retry: RetryOpts,
unique_for: Option<std::time::Duration>,
}

Expand All @@ -78,8 +78,14 @@ impl EnqueueOpts {
}

#[must_use]
pub fn retry(self, retry: bool) -> Self {
Self { retry, ..self }
pub fn retry<RO>(self, retry: RO) -> Self
where
RO: Into<RetryOpts>,
{
Self {
retry: retry.into(),
..self
}
}

#[must_use]
Expand All @@ -106,7 +112,7 @@ impl EnqueueOpts {
jid: new_jid(),
created_at: chrono::Utc::now().timestamp() as f64,
enqueued_at: None,
retry: self.retry,
retry: self.retry.clone(),
args,

// Make default eventually...
Expand Down Expand Up @@ -178,7 +184,7 @@ fn new_jid() -> String {

pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
queue: String,
retry: bool,
retry: RetryOpts,
args: PhantomData<Args>,
worker: PhantomData<W>,
unique_for: Option<std::time::Duration>,
Expand All @@ -192,16 +198,22 @@ where
pub fn new() -> Self {
Self {
queue: "default".into(),
retry: true,
retry: RetryOpts::Yes,
args: PhantomData,
worker: PhantomData,
unique_for: None,
}
}

#[must_use]
pub fn retry(self, retry: bool) -> Self {
Self { retry, ..self }
pub fn retry<RO>(self, retry: RO) -> Self
where
RO: Into<RetryOpts>,
{
Self {
retry: retry.into(),
..self
}
}

#[must_use]
Expand Down Expand Up @@ -250,7 +262,7 @@ where
impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
fn from(opts: &WorkerOpts<Args, W>) -> Self {
Self {
retry: opts.retry,
retry: opts.retry.clone(),
queue: opts.queue.clone(),
unique_for: opts.unique_for,
}
Expand Down Expand Up @@ -391,6 +403,31 @@ impl WorkerRef {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum RetryOpts {
#[serde(rename = "true")]
Yes,
#[serde(rename = "false")]
Never,
Max(usize),
}

impl From<bool> for RetryOpts {
fn from(value: bool) -> Self {
match value {
true => RetryOpts::Yes,
false => RetryOpts::Never,
}
}
}

impl From<usize> for RetryOpts {
fn from(value: usize) -> Self {
RetryOpts::Max(value)
}
}

//
// {
// "retry": true,
Expand All @@ -410,7 +447,7 @@ impl WorkerRef {
pub struct Job {
pub queue: String,
pub args: JsonValue,
pub retry: bool,
pub retry: RetryOpts,
pub class: String,
pub jid: String,
pub created_at: f64,
Expand Down Expand Up @@ -533,6 +570,30 @@ mod test {
pub mod workers {
use super::super::super::super::*;

pub struct TestOpts;

#[async_trait]
impl Worker<()> for TestOpts {
fn opts() -> WorkerOpts<(), Self>
where
Self: Sized,
{
WorkerOpts::new()
// Test bool
.retry(false)
// Test usize
.retry(42)
// Test the new type
.retry(RetryOpts::Never)
.unique_for(std::time::Duration::from_secs(30))
.queue("yolo_quue")
}

async fn perform(&self, _args: ()) -> Result<()> {
Ok(())
}
}

pub struct X1Y2MyJob;

#[async_trait]
Expand Down
16 changes: 11 additions & 5 deletions src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Result;
use crate::{Counter, Job, RedisPool, UnitOfWork, WorkerRef};
use crate::{Counter, Job, RedisPool, RetryOpts, UnitOfWork, WorkerRef};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -158,7 +158,13 @@ impl ServerMiddleware for RetryMiddleware {
worker: Arc<WorkerRef>,
redis: RedisPool,
) -> Result<()> {
let max_retries = worker.max_retries();
// Check the job for a max retries N in the retry field and then fall
// back to the worker default max retries.
let max_retries = if let RetryOpts::Max(max_retries) = job.retry {
max_retries
} else {
worker.max_retries()
};

let err = {
match chain.next(job, worker, redis.clone()).await {
Expand All @@ -180,7 +186,7 @@ impl ServerMiddleware for RetryMiddleware {
job.retry_count = Some(retry_count);

// Attempt the retry.
if retry_count > max_retries {
if retry_count > max_retries || job.retry == RetryOpts::Never {
error!({
"status" = "fail",
"class" = &job.class,
Expand All @@ -207,7 +213,7 @@ impl ServerMiddleware for RetryMiddleware {
#[cfg(test)]
mod test {
use super::*;
use crate::{RedisConnectionManager, RedisPool, Worker};
use crate::{RedisConnectionManager, RedisPool, RetryOpts, Worker};
use bb8::Pool;
use tokio::sync::Mutex;

Expand All @@ -221,7 +227,7 @@ mod test {
class: "TestWorker".into(),
queue: "default".into(),
args: vec![1337].into(),
retry: true,
retry: RetryOpts::Yes,
jid: crate::new_jid(),
created_at: 1337.0,
enqueued_at: None,
Expand Down
30 changes: 17 additions & 13 deletions src/periodic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Result;
use crate::{new_jid, Error, Job, Processor, RedisConnection, RedisPool, Worker};
use crate::{new_jid, Error, Job, Processor, RedisConnection, RedisPool, RetryOpts, Worker};
pub use cron_clock::{Schedule as Cron, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
Expand All @@ -19,7 +19,7 @@ pub struct Builder {
pub(crate) name: Option<String>,
pub(crate) queue: Option<String>,
pub(crate) args: Option<JsonValue>,
pub(crate) retry: Option<bool>,
pub(crate) retry: Option<RetryOpts>,
pub(crate) cron: Cron,
}

Expand All @@ -40,6 +40,18 @@ impl Builder {
..self
}
}

#[must_use]
pub fn retry<RO>(self, retry: RO) -> Builder
where
RO: Into<RetryOpts>,
{
Self {
retry: Some(retry.into()),
..self
}
}

pub fn queue<S: Into<String>>(self, queue: S) -> Builder {
Builder {
queue: Some(queue.into()),
Expand All @@ -65,14 +77,6 @@ impl Builder {
})
}

#[must_use]
pub fn retry(self, retry: bool) -> Self {
Self {
retry: Some(retry),
..self
}
}

pub async fn register<W, Args>(self, processor: &mut Processor, worker: W) -> Result<()>
where
Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
Expand Down Expand Up @@ -100,7 +104,7 @@ impl Builder {
..Default::default()
};

pj.retry = self.retry;
pj.retry = self.retry.clone();
pj.queue = self.queue.clone();
pj.args = self.args.clone().map(|a| a.to_string());

Expand All @@ -117,7 +121,7 @@ pub struct PeriodicJob {
pub(crate) cron: String,
pub(crate) queue: Option<String>,
pub(crate) args: Option<String>,
retry: Option<bool>,
retry: Option<RetryOpts>,

#[serde(skip)]
cron_schedule: Option<Cron>,
Expand Down Expand Up @@ -191,7 +195,7 @@ impl PeriodicJob {
jid: new_jid(),
created_at: chrono::Utc::now().timestamp() as f64,
enqueued_at: None,
retry: self.retry.unwrap_or(false),
retry: self.retry.clone().unwrap_or(RetryOpts::Never),
args,

// Make default eventually...
Expand Down

0 comments on commit 40232a4

Please sign in to comment.