Skip to content

Commit

Permalink
Refactor retry to be true/false or N; backward compatible
Browse files Browse the repository at this point in the history
This changes JsonValue to RetryOpts which is an enum for Yes, Never, and
Max(n) to support the sidekiq bool|int typing. The public builder
interfaces for opts have now been moved to Into<RetryOpts> which will
continue to work for true/false like before but also a numeric value.

Also... I found that I hadn't impl'd the RetryMiddleware to respect a
retry: false option, so that's been fixed. I also added max_retries to
be plucked from the job if set and will be used in favor of whatever the
value was for the worker since they can be different.

TODO: Need to add a test for the new retry middleware logic. That
shouldn't have been a bug in the first place.
  • Loading branch information
film42 authored and Antti committed Oct 2, 2024
1 parent f1306a8 commit 291b99d
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 27 deletions.
79 changes: 70 additions & 9 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ pub type Result<T> = std::result::Result<T, Error>;
pub fn opts() -> EnqueueOpts {
EnqueueOpts {
queue: "default".into(),
retry: JsonValue::Bool(true),
retry: RetryOpts::Yes,
unique_for: None,
}
}

pub struct EnqueueOpts {
queue: String,
retry: JsonValue,
retry: RetryOpts,
unique_for: Option<std::time::Duration>,
}

Expand All @@ -78,8 +78,14 @@ impl EnqueueOpts {
}

#[must_use]
pub fn retry(self, retry: JsonValue) -> Self {
Self { retry, ..self }
pub fn retry<RO>(self, retry: RO) -> Self
where
RO: Into<RetryOpts>,
{
Self {
retry: retry.into(),
..self
}
}

#[must_use]
Expand Down Expand Up @@ -178,7 +184,7 @@ fn new_jid() -> String {

pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
queue: String,
retry: JsonValue,
retry: RetryOpts,
args: PhantomData<Args>,
worker: PhantomData<W>,
unique_for: Option<std::time::Duration>,
Expand All @@ -192,16 +198,22 @@ where
pub fn new() -> Self {
Self {
queue: "default".into(),
retry: JsonValue::Bool(true),
retry: RetryOpts::Yes,
args: PhantomData,
worker: PhantomData,
unique_for: None,
}
}

#[must_use]
pub fn retry(self, retry: JsonValue) -> Self {
Self { retry, ..self }
pub fn retry<RO>(self, retry: RO) -> Self
where
RO: Into<RetryOpts>,
{
Self {
retry: retry.into(),
..self
}
}

#[must_use]
Expand Down Expand Up @@ -391,6 +403,31 @@ impl WorkerRef {
}
}

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
#[serde(untagged)]
pub enum RetryOpts {
#[serde(rename = "true")]
Yes,
#[serde(rename = "false")]
Never,
Max(usize),
}

impl From<bool> for RetryOpts {
fn from(value: bool) -> Self {
match value {
true => RetryOpts::Yes,
false => RetryOpts::Never,
}
}
}

impl From<usize> for RetryOpts {
fn from(value: usize) -> Self {
RetryOpts::Max(value)
}
}

//
// {
// "retry": true,
Expand All @@ -410,7 +447,7 @@ impl WorkerRef {
pub struct Job {
pub queue: String,
pub args: JsonValue,
pub retry: JsonValue,
pub retry: RetryOpts,
pub class: String,
pub jid: String,
pub created_at: f64,
Expand Down Expand Up @@ -533,6 +570,30 @@ mod test {
pub mod workers {
use super::super::super::super::*;

pub struct TestOpts;

#[async_trait]
impl Worker<()> for TestOpts {
fn opts() -> WorkerOpts<(), Self>
where
Self: Sized,
{
WorkerOpts::new()
// Test bool
.retry(false)
// Test usize
.retry(42)
// Test the new type
.retry(RetryOpts::Never)
.unique_for(std::time::Duration::from_secs(30))
.queue("yolo_quue")
}

async fn perform(&self, _args: ()) -> Result<()> {
Ok(())
}
}

pub struct X1Y2MyJob;

#[async_trait]
Expand Down
16 changes: 11 additions & 5 deletions src/middleware.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Result;
use crate::{Counter, Job, RedisPool, UnitOfWork, WorkerRef};
use crate::{Counter, Job, RedisPool, RetryOpts, UnitOfWork, WorkerRef};
use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::RwLock;
Expand Down Expand Up @@ -158,7 +158,13 @@ impl ServerMiddleware for RetryMiddleware {
worker: Arc<WorkerRef>,
redis: RedisPool,
) -> Result<()> {
let max_retries = worker.max_retries();
// Check the job for a max retries N in the retry field and then fall
// back to the worker default max retries.
let max_retries = if let RetryOpts::Max(max_retries) = job.retry {
max_retries
} else {
worker.max_retries()
};

let err = {
match chain.next(job, worker, redis.clone()).await {
Expand All @@ -180,7 +186,7 @@ impl ServerMiddleware for RetryMiddleware {
job.retry_count = Some(retry_count);

// Attempt the retry.
if retry_count > max_retries {
if retry_count > max_retries || job.retry == RetryOpts::Never {
error!({
"status" = "fail",
"class" = &job.class,
Expand All @@ -207,7 +213,7 @@ impl ServerMiddleware for RetryMiddleware {
#[cfg(test)]
mod test {
use super::*;
use crate::{RedisConnectionManager, RedisPool, Worker};
use crate::{RedisConnectionManager, RedisPool, RetryOpts, Worker};
use bb8::Pool;
use tokio::sync::Mutex;

Expand All @@ -221,7 +227,7 @@ mod test {
class: "TestWorker".into(),
queue: "default".into(),
args: vec![1337].into(),
retry: true,
retry: RetryOpts::Yes,
jid: crate::new_jid(),
created_at: 1337.0,
enqueued_at: None,
Expand Down
28 changes: 16 additions & 12 deletions src/periodic.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::Result;
use crate::{new_jid, Error, Job, Processor, RedisConnection, RedisPool, Worker};
use crate::{new_jid, Error, Job, Processor, RedisConnection, RedisPool, RetryOpts, Worker};
pub use cron_clock::{Schedule as Cron, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
Expand All @@ -19,7 +19,7 @@ pub struct Builder {
pub(crate) name: Option<String>,
pub(crate) queue: Option<String>,
pub(crate) args: Option<JsonValue>,
pub(crate) retry: Option<JsonValue>,
pub(crate) retry: Option<RetryOpts>,
pub(crate) cron: Cron,
}

Expand All @@ -40,6 +40,18 @@ impl Builder {
..self
}
}

#[must_use]
pub fn retry<RO>(self, retry: RO) -> Builder
where
RO: Into<RetryOpts>,
{
Self {
retry: Some(retry.into()),
..self
}
}

pub fn queue<S: Into<String>>(self, queue: S) -> Builder {
Builder {
queue: Some(queue.into()),
Expand All @@ -65,14 +77,6 @@ impl Builder {
})
}

#[must_use]
pub fn retry(self, retry: JsonValue) -> Self {
Self {
retry: Some(retry),
..self
}
}

pub async fn register<W, Args>(self, processor: &mut Processor, worker: W) -> Result<()>
where
Args: Sync + Send + for<'de> serde::Deserialize<'de> + 'static,
Expand Down Expand Up @@ -117,7 +121,7 @@ pub struct PeriodicJob {
pub(crate) cron: String,
pub(crate) queue: Option<String>,
pub(crate) args: Option<String>,
retry: Option<JsonValue>,
retry: Option<RetryOpts>,

#[serde(skip)]
cron_schedule: Option<Cron>,
Expand Down Expand Up @@ -191,7 +195,7 @@ impl PeriodicJob {
jid: new_jid(),
created_at: chrono::Utc::now().timestamp() as f64,
enqueued_at: None,
retry: self.retry.clone().unwrap_or(JsonValue::Bool(false)),
retry: self.retry.clone().unwrap_or(RetryOpts::Never),
args,

// Make default eventually...
Expand Down
2 changes: 1 addition & 1 deletion src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ impl StatsPublisher {
pub async fn publish_stats(&self, redis: RedisPool) -> Result<(), Box<dyn std::error::Error>> {
let stats = self.create_process_stats().await?;
let mut conn = redis.get().await?;
let _ : () = conn.cmd_with_key("HSET", self.identity.clone())
conn.cmd_with_key("HSET", self.identity.clone())
.arg("rss")
.arg(stats.rss)
.arg("rtt_us")
Expand Down

0 comments on commit 291b99d

Please sign in to comment.