Skip to content

Commit

Permalink
Merge pull request #54 from film42/gt/retry_queue
Browse files Browse the repository at this point in the history
Add support for a retry_queue + tests
  • Loading branch information
film42 authored Oct 14, 2024
2 parents b974400 + 18622c4 commit de5c24d
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ pub fn opts() -> EnqueueOpts {
queue: "default".into(),
retry: RetryOpts::Yes,
unique_for: None,
retry_queue: None,
}
}

pub struct EnqueueOpts {
queue: String,
retry: RetryOpts,
unique_for: Option<std::time::Duration>,
retry_queue: Option<String>,
}

impl EnqueueOpts {
Expand Down Expand Up @@ -99,6 +101,14 @@ impl EnqueueOpts {
}
}

#[must_use]
pub fn retry_queue(self, retry_queue: String) -> Self {
Self {
retry_queue: Some(retry_queue),
..self
}
}

pub fn create_job(&self, class: String, args: impl serde::Serialize) -> Result<Job> {
let args = serde_json::to_value(args)?;

Expand All @@ -120,11 +130,13 @@ impl EnqueueOpts {

// Make default eventually...
error_message: None,
error_class: None,
failed_at: None,
retry_count: None,
retried_at: None,

// Meta for enqueueing
retry_queue: self.retry_queue.clone(),
unique_for: self.unique_for,
})
}
Expand Down Expand Up @@ -191,6 +203,7 @@ pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> {
args: PhantomData<Args>,
worker: PhantomData<W>,
unique_for: Option<std::time::Duration>,
retry_queue: Option<String>,
}

impl<Args, W> WorkerOpts<Args, W>
Expand All @@ -205,6 +218,7 @@ where
args: PhantomData,
worker: PhantomData,
unique_for: None,
retry_queue: None,
}
}

Expand All @@ -219,6 +233,14 @@ where
}
}

#[must_use]
pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self {
Self {
retry_queue: Some(retry_queue.into()),
..self
}
}

#[must_use]
pub fn queue<S: Into<String>>(self, queue: S) -> Self {
Self {
Expand Down Expand Up @@ -268,6 +290,7 @@ impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts {
retry: opts.retry.clone(),
queue: opts.queue.clone(),
unique_for: opts.unique_for,
retry_queue: opts.retry_queue.clone(),
}
}
}
Expand Down Expand Up @@ -511,8 +534,10 @@ pub struct Job {
pub enqueued_at: Option<f64>,
pub failed_at: Option<f64>,
pub error_message: Option<String>,
pub error_class: Option<String>,
pub retry_count: Option<usize>,
pub retried_at: Option<f64>,
pub retry_queue: Option<String>,

#[serde(skip)]
pub unique_for: Option<std::time::Duration>,
Expand Down
8 changes: 8 additions & 0 deletions src/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,15 @@ impl ServerMiddleware for RetryMiddleware {
"class" = &job.class,
"jid" = &job.jid,
"queue" = &job.queue,
"retry_queue" = &job.retry_queue,
"err" = &job.error_message
}, "Scheduling job for retry in the future");

// We will now make sure we use the new retry_queue option if set.
if let Some(ref retry_queue) = job.retry_queue {
job.queue = retry_queue.into();
}

UnitOfWork::from_job(job).reenqueue(&redis).await?;
}

Expand Down Expand Up @@ -233,8 +239,10 @@ mod test {
enqueued_at: None,
failed_at: None,
error_message: None,
error_class: None,
retry_count: None,
retried_at: None,
retry_queue: None,
unique_for: None,
}
}
Expand Down
2 changes: 2 additions & 0 deletions src/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ impl PeriodicJob {

// Make default eventually...
error_message: None,
error_class: None,
failed_at: None,
retry_count: None,
retried_at: None,
retry_queue: None,

// Meta data not used in periodic jobs right now...
unique_for: None,
Expand Down
41 changes: 41 additions & 0 deletions tests/server_middleware_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,4 +289,45 @@ mod test {

assert_eq!(n_jobs_retried, 0, "no jobs in the retry queue");
}

#[tokio::test]
#[serial]
async fn can_retry_job_into_different_retry_queue() {
let worker = AlwaysFailWorker;
let queue = "failure_zone_max_on_job".to_string();
let retry_queue = "the_retry_queue".to_string();
let (mut p, redis) = new_base_processor(queue.clone()).await;
let (mut retry_p, _retry_redis) = new_base_processor(retry_queue.clone()).await;
p.register(worker.clone());

let mut job = AlwaysFailWorker::opts()
.queue(queue)
.retry(5)
.retry_queue(&retry_queue)
.perform_async(&redis, ())
.await
.expect("enqueues");

assert_eq!(p.process_one_tick_once().await.unwrap(), WorkFetcher::Done);
let sets = vec!["retry".to_string()];
let sched = Scheduled::new(redis.clone());
let future_date = chrono::Utc::now() + chrono::Duration::days(30);

// We should have one job that needs retrying.
let n_jobs_retried = sched.enqueue_jobs(future_date, &sets).await;
assert!(n_jobs_retried.is_ok());
let n_jobs_retried = n_jobs_retried.unwrap();
assert_eq!(n_jobs_retried, 1, "we have one job to retry in the queue");

// Let's grab that job.
let job = retry_p.fetch().await;
assert!(job.is_ok());
let job = job.unwrap();
assert!(job.is_some());
let job = job.unwrap();

assert_eq!(job.job.class, "AlwaysFailWorker");
assert_eq!(job.job.retry_queue, Some(retry_queue));
assert_eq!(job.job.retry_count, Some(1));
}
}

0 comments on commit de5c24d

Please sign in to comment.