From 73e9c4e541042e99514dd12ff27626c6df88b1e8 Mon Sep 17 00:00:00 2001 From: Daniel Imfeld Date: Wed, 27 Mar 2024 19:08:13 -1000 Subject: [PATCH] Add a descriptive name field to the job (#15) * Add a descriptive name field to the job This name does not have to be unique, and allows easier lookup of the job or a specific set of jobs. * changelog * update name in recurring job data * update changelog * cleanup --- effectum/CHANGELOG.md | 4 + effectum/examples/recurring.rs | 7 +- effectum/migrations/00003-job-name-column.sql | 6 + effectum/src/add_job.rs | 18 ++ effectum/src/db_writer/add_job.rs | 5 +- effectum/src/db_writer/ready_jobs.rs | 7 +- effectum/src/db_writer/recurring.rs | 8 +- effectum/src/job.rs | 13 +- effectum/src/job_status.rs | 178 ++++++++++-------- effectum/src/local_queue.rs | 46 +++++ effectum/src/migrations.rs | 3 +- effectum/src/recurring.rs | 45 ++++- 12 files changed, 239 insertions(+), 101 deletions(-) create mode 100644 effectum/migrations/00003-job-name-column.sql diff --git a/effectum/CHANGELOG.md b/effectum/CHANGELOG.md index a550e29..dd213e1 100644 --- a/effectum/CHANGELOG.md +++ b/effectum/CHANGELOG.md @@ -1,3 +1,7 @@ +# 0.6.0 + +- Jobs can now have a name, which can be used as an argument to `get_jobs_by_name`. The name does not have to be unique. + # 0.5.1 - Allow configuring whether to use the `Display` or `Debug` formatter to format failure information. diff --git a/effectum/examples/recurring.rs b/effectum/examples/recurring.rs index 2bf9700..3666d71 100644 --- a/effectum/examples/recurring.rs +++ b/effectum/examples/recurring.rs @@ -3,18 +3,13 @@ use std::{ path::Path, - sync::{ - atomic::{AtomicI64, Ordering}, - Arc, Mutex, - }, + sync::{Arc, Mutex}, time::Duration, }; use ahash::HashMap; use effectum::{Job, JobRunner, Queue, RecurringJobSchedule, RunningJob}; -use serde::{Deserialize, Serialize}; use time::OffsetDateTime; -use tracing::{event, Level}; use tracing_subscriber::{layer::SubscriberExt, EnvFilter}; #[derive(Debug)] diff --git a/effectum/migrations/00003-job-name-column.sql b/effectum/migrations/00003-job-name-column.sql new file mode 100644 index 0000000..7b3adb8 --- /dev/null +++ b/effectum/migrations/00003-job-name-column.sql @@ -0,0 +1,6 @@ +ALTER TABLE jobs + ADD COLUMN name text; + +CREATE INDEX jobs_name_added_at ON jobs (name, added_at DESC) +WHERE + name IS NOT NULL; diff --git a/effectum/src/add_job.rs b/effectum/src/add_job.rs index 988c3a0..6a3ba54 100644 --- a/effectum/src/add_job.rs +++ b/effectum/src/add_job.rs @@ -28,6 +28,9 @@ pub struct Job { pub id: Uuid, /// The name of the job, which matches the name used in the [JobRunner](crate::JobRunner) for the job. pub job_type: Cow<'static, str>, + /// A description for this job which can be passed to [Queue::get_jobs_by_name]. This value does not + /// have to be unique among all jobs. + pub name: Option, /// Jobs with higher `priority` will be executed first. pub priority: i32, /// Jobs that are expected to take more processing resources can be given a higher weight @@ -98,6 +101,7 @@ impl Default for Job { Self { id: Uuid::now_v7(), job_type: Default::default(), + name: None, priority: 0, weight: 1, run_at: Default::default(), @@ -126,6 +130,20 @@ impl JobBuilder { } } + /// Set the name of this job. This name is purely informational, and does not have to be unique. Jobs can be fetched by + /// their name using [Queue::get_jobs_by_name]. + pub fn name(mut self, name: impl ToString) -> Self { + self.job.name = Some(name.to_string()); + self + } + + /// Set the name of this job. This name is purely informational, and does not have to be unique. Jobs can be fetched by + /// their name using [Queue::get_jobs_by_name]. + pub fn name_opt(mut self, name: Option) -> Self { + self.job.name = name.map(|n| n.to_string()); + self + } + /// Set the priority of the job. pub fn priority(mut self, priority: i32) -> Self { self.job.priority = priority; diff --git a/effectum/src/db_writer/add_job.rs b/effectum/src/db_writer/add_job.rs index 950a483..9960f9e 100644 --- a/effectum/src/db_writer/add_job.rs +++ b/effectum/src/db_writer/add_job.rs @@ -24,11 +24,11 @@ pub(crate) struct AddMultipleJobsArgs { pub(super) const INSERT_JOBS_QUERY: &str = r##" INSERT INTO jobs - (external_id, job_type, status, priority, weight, from_base_job, orig_run_at, payload, + (external_id, job_type, name, status, priority, weight, from_base_job, orig_run_at, payload, max_retries, backoff_multiplier, backoff_randomization, backoff_initial_interval, added_at, default_timeout, heartbeat_increment, run_info) VALUES - ($external_id, $job_type, $status, $priority, $weight, $from_base_job, $run_at, $payload, + ($external_id, $job_type, $name, $status, $priority, $weight, $from_base_job, $run_at, $payload, $max_retries, $backoff_multiplier, $backoff_randomization, $backoff_initial_interval, $added_at, $default_timeout, $heartbeat_increment, '[]') "##; @@ -52,6 +52,7 @@ pub(super) fn execute_add_job_stmt( jobs_stmt.execute(named_params! { "$external_id": &job_config.id, "$job_type": job_config.job_type, + "$name": job_config.name, "$priority": job_config.priority, "$weight": job_config.weight, "$from_base_job": job_config.from_recurring, diff --git a/effectum/src/db_writer/ready_jobs.rs b/effectum/src/db_writer/ready_jobs.rs index 8b6ae17..1ae7441 100644 --- a/effectum/src/db_writer/ready_jobs.rs +++ b/effectum/src/db_writer/ready_jobs.rs @@ -51,7 +51,8 @@ fn do_get_ready_jobs( backoff_randomization, backoff_initial_interval, max_retries, - orig_run_at + orig_run_at, + jobs.name FROM active_jobs JOIN jobs USING(job_id) WHERE active_worker_id IS NULL @@ -66,6 +67,7 @@ fn do_get_ready_jobs( struct JobResult { job_id: i64, external_id: Uuid, + name: Option, priority: i32, weight: u16, job_type: String, @@ -103,6 +105,7 @@ fn do_get_ready_jobs( let backoff_initial_interval: i32 = row.get(11)?; let max_retries: i32 = row.get(12)?; let orig_run_at: i64 = row.get(13)?; + let name: Option = row.get(14)?; Ok(JobResult { job_id, @@ -119,6 +122,7 @@ fn do_get_ready_jobs( backoff_initial_interval, max_retries, orig_run_at, + name, }) }, )?; @@ -160,6 +164,7 @@ fn do_get_ready_jobs( let job = RunningJob(Arc::new(RunningJobData { id: job.external_id, job_id: job.job_id, + name: job.name, worker_id, heartbeat_increment: job.heartbeat_increment, job_type: job.job_type, diff --git a/effectum/src/db_writer/recurring.rs b/effectum/src/db_writer/recurring.rs index de1ede0..3eacf3a 100644 --- a/effectum/src/db_writer/recurring.rs +++ b/effectum/src/db_writer/recurring.rs @@ -219,7 +219,8 @@ fn update_existing_recurring_job( backoff_randomization = ?8, backoff_initial_interval = ?9, default_timeout = ?10, - heartbeat_increment = ?11 + heartbeat_increment = ?11, + name = ?12 WHERE job_id=?1"##, )?; base_update_stmt.execute(params![ @@ -234,6 +235,7 @@ fn update_existing_recurring_job( job.retries.backoff_initial_interval.as_secs(), job.timeout.as_secs(), job.heartbeat_increment.as_secs(), + job.name, ])?; // Update any pending jobs @@ -250,7 +252,8 @@ fn update_existing_recurring_job( backoff_randomization = ?, backoff_initial_interval = ?, default_timeout = ?, - heartbeat_increment = ? + heartbeat_increment = ?, + name = ? WHERE from_base_job = ? AND status = 'pending' RETURNING job_id"##, )?; @@ -269,6 +272,7 @@ fn update_existing_recurring_job( job.retries.backoff_initial_interval.as_secs(), job.timeout.as_secs(), job.heartbeat_increment.as_secs(), + job.name, base_job_id, ], |row| row.get::<_, rusqlite::types::Value>(0), diff --git a/effectum/src/job.rs b/effectum/src/job.rs index 7fdd89b..ae80b79 100644 --- a/effectum/src/job.rs +++ b/effectum/src/job.rs @@ -47,6 +47,8 @@ pub struct RunningJobData { /// The id of this job. pub id: Uuid, pub(crate) job_id: i64, + /// The name given to this job + pub name: Option, /// The ID of the [Worker](crate::worker::Worker) that is running this job. pub worker_id: WorkerId, /// How many seconds a heartbeat can extend the expiration time. @@ -83,6 +85,7 @@ impl Debug for RunningJobData { f.debug_struct("Job") .field("id", &self.id) .field("job_id", &self.job_id) + .field("name", &self.name) .field("worker_id", &self.worker_id) .field("heartbeat_increment", &self.heartbeat_increment) .field("job_type", &self.job_type) @@ -111,8 +114,14 @@ impl Display for RunningJobData { write!( f, - "Job {{ id: {}, job_type: {}, priority: {}, start_time: {}, expires: {}, try: {} }}", - self.id, self.job_type, self.priority, self.start_time, expires, self.current_try + "Job {{ id: {}, name: {}, job_type: {}, priority: {}, start_time: {}, expires: {}, try: {} }}", + self.id, + self.name.as_deref().unwrap_or("_"), + self.job_type, + self.priority, + self.start_time, + expires, + self.current_try ) } } diff --git a/effectum/src/job_status.rs b/effectum/src/job_status.rs index 78731af..fc74526 100644 --- a/effectum/src/job_status.rs +++ b/effectum/src/job_status.rs @@ -85,6 +85,8 @@ impl FromStr for JobState { pub struct JobStatus { /// The job's ID. pub id: Uuid, + /// The name of the job. + pub name: Option, /// The type of a job pub job_type: String, /// If the job is waiting, running, or finished @@ -129,6 +131,7 @@ pub struct NumActiveJobs { pub(crate) enum JobIdQuery { Id(i64), + Name(String), ExternalId(Uuid), } @@ -136,12 +139,14 @@ impl Queue { pub(crate) fn run_job_status_query( conn: &rusqlite::Connection, id: JobIdQuery, - ) -> Result { + limit: usize, + ) -> Result, Error> { let (id_column, job_id) = match id { JobIdQuery::Id(id) => ("job_id", rusqlite::types::Value::from(id)), JobIdQuery::ExternalId(external_id) => { ("external_id", rusqlite::types::Value::from(external_id)) } + JobIdQuery::Name(name) => ("name", rusqlite::types::Value::from(name)), }; let mut stmt = conn.prepare_cached( @@ -157,94 +162,98 @@ impl Queue { max_retries, backoff_multiplier, backoff_randomization, backoff_initial_interval, added_at, COALESCE(active_jobs.started_at, jobs.started_at) AS started_at, - finished_at, expires_at, run_info + finished_at, expires_at, run_info, name FROM jobs LEFT JOIN active_jobs USING(job_id) WHERE {}=?1 + ORDER BY added_at DESC + LIMIT ?2 "##, id_column ))?; - let mut rows = stmt.query_and_then([job_id], |row| { - let started_at = row - .get_ref(14)? - .as_i64_or_null() - .map_err(|e| Error::ColumnType(e.into(), "started_at"))? - .map(|i| { - OffsetDateTime::from_unix_timestamp(i) - .map_err(|_| Error::TimestampOutOfRange("started_at")) - }) - .transpose()?; - - let finished_at = row - .get_ref(15)? - .as_i64_or_null() - .map_err(|e| Error::ColumnType(e.into(), "finished_at"))? - .map(|i| { - OffsetDateTime::from_unix_timestamp(i) - .map_err(|_| Error::TimestampOutOfRange("finished_at")) - }) - .transpose()?; - - let expires_at = row - .get_ref(16)? - .as_i64_or_null() - .map_err(|e| Error::ColumnType(e.into(), "expires_at"))? - .map(|i| { - OffsetDateTime::from_unix_timestamp(i) - .map_err(|_| Error::TimestampOutOfRange("expires_at")) - }) - .transpose()?; + let rows = stmt.query_and_then( + [job_id, rusqlite::types::Value::from(limit as i32)], + |row| { + let started_at = row + .get_ref(14)? + .as_i64_or_null() + .map_err(|e| Error::ColumnType(e.into(), "started_at"))? + .map(|i| { + OffsetDateTime::from_unix_timestamp(i) + .map_err(|_| Error::TimestampOutOfRange("started_at")) + }) + .transpose()?; - let run_info_str = row - .get_ref(17)? - .as_str_or_null() - .map_err(|e| Error::ColumnType(e.into(), "run_info"))?; - let run_info: SmallVec<[RunInfo>; 4]> = match run_info_str { - Some(run_info_str) => { - serde_json::from_str(run_info_str).map_err(Error::InvalidJobRunInfo)? - } - None => SmallVec::new(), - }; + let finished_at = row + .get_ref(15)? + .as_i64_or_null() + .map_err(|e| Error::ColumnType(e.into(), "finished_at"))? + .map(|i| { + OffsetDateTime::from_unix_timestamp(i) + .map_err(|_| Error::TimestampOutOfRange("finished_at")) + }) + .transpose()?; - let status = JobStatus { - id: row.get(0).map_err(|e| Error::ColumnType(e, "id"))?, - job_type: row.get(1).map_err(|e| Error::ColumnType(e, "job_type"))?, - state: row - .get_ref(2)? - .as_str() - .map_err(|e| Error::ColumnType(e.into(), "state"))? - .parse()?, - priority: row.get(3)?, - weight: row.get(4)?, - orig_run_at: OffsetDateTime::from_unix_timestamp(row.get(5)?) - .map_err(|_| Error::TimestampOutOfRange("orig_run_at"))?, - run_at: row - .get_ref(6)? + let expires_at = row + .get_ref(16)? .as_i64_or_null() - .map_err(|e| Error::ColumnType(e.into(), "run_at"))? - .map(OffsetDateTime::from_unix_timestamp) - .transpose() - .map_err(|_| Error::TimestampOutOfRange("run_at"))?, - payload: row.get(7)?, - current_try: row.get(8)?, - max_retries: row.get(9)?, - backoff_multiplier: row.get(10)?, - backoff_randomization: row.get(11)?, - backoff_initial_interval: Duration::seconds(row.get(12)?), - added_at: OffsetDateTime::from_unix_timestamp(row.get(13)?) - .map_err(|_| Error::TimestampOutOfRange("added_at"))?, - started_at, - finished_at, - expires_at, - run_info, - }; + .map_err(|e| Error::ColumnType(e.into(), "expires_at"))? + .map(|i| { + OffsetDateTime::from_unix_timestamp(i) + .map_err(|_| Error::TimestampOutOfRange("expires_at")) + }) + .transpose()?; + + let run_info_str = row + .get_ref(17)? + .as_str_or_null() + .map_err(|e| Error::ColumnType(e.into(), "run_info"))?; + let run_info: SmallVec<[RunInfo>; 4]> = match run_info_str { + Some(run_info_str) => { + serde_json::from_str(run_info_str).map_err(Error::InvalidJobRunInfo)? + } + None => SmallVec::new(), + }; - Ok::<_, Error>(status) - })?; + let status = JobStatus { + id: row.get(0).map_err(|e| Error::ColumnType(e, "id"))?, + job_type: row.get(1).map_err(|e| Error::ColumnType(e, "job_type"))?, + state: row + .get_ref(2)? + .as_str() + .map_err(|e| Error::ColumnType(e.into(), "state"))? + .parse()?, + priority: row.get(3)?, + weight: row.get(4)?, + orig_run_at: OffsetDateTime::from_unix_timestamp(row.get(5)?) + .map_err(|_| Error::TimestampOutOfRange("orig_run_at"))?, + run_at: row + .get_ref(6)? + .as_i64_or_null() + .map_err(|e| Error::ColumnType(e.into(), "run_at"))? + .map(OffsetDateTime::from_unix_timestamp) + .transpose() + .map_err(|_| Error::TimestampOutOfRange("run_at"))?, + payload: row.get(7)?, + current_try: row.get(8)?, + max_retries: row.get(9)?, + backoff_multiplier: row.get(10)?, + backoff_randomization: row.get(11)?, + backoff_initial_interval: Duration::seconds(row.get(12)?), + added_at: OffsetDateTime::from_unix_timestamp(row.get(13)?) + .map_err(|_| Error::TimestampOutOfRange("added_at"))?, + started_at, + finished_at, + expires_at, + run_info, + name: row.get(18).map_err(|e| Error::ColumnType(e, "name"))?, + }; - let status = rows.next().ok_or(Error::NotFound)??; + Ok::<_, Error>(status) + }, + )?; - Ok(status) + rows.collect::, _>>() } /// Return information about a job @@ -253,11 +262,22 @@ impl Queue { let status = conn .interact(move |conn| { - Self::run_job_status_query(conn, JobIdQuery::ExternalId(external_id)) + Self::run_job_status_query(conn, JobIdQuery::ExternalId(external_id), 1) }) .await??; - Ok(status) + status.into_iter().next().ok_or(Error::NotFound) + } + + /// Get jobs by their name, ordered by the most recently added. + pub async fn get_jobs_by_name(&self, name: String, limit: usize) -> Result> { + let conn = self.state.read_conn_pool.get().await?; + + let rows = conn + .interact(move |conn| Self::run_job_status_query(conn, JobIdQuery::Name(name), limit)) + .await??; + + Ok(rows.into_vec()) } /// Return counts about the number of jobs running and waiting to run. diff --git a/effectum/src/local_queue.rs b/effectum/src/local_queue.rs index 6b94ac0..514fd02 100644 --- a/effectum/src/local_queue.rs +++ b/effectum/src/local_queue.rs @@ -264,6 +264,52 @@ mod tests { wait_for_job("job to run", &test.queue, job_id).await; } } + #[tokio::test] + async fn status_by_name() { + let test = TestEnvironment::new().await; + + let _worker = test.worker().build().await.expect("failed to build worker"); + + let ids = test + .queue + .add_jobs(vec![ + JobBuilder::new("counter") + .name("counter1".to_string()) + .build(), + JobBuilder::new("counter").name("counter2").build(), + JobBuilder::new("counter") + .name_opt(Some("counter1".to_string())) + .build(), + ]) + .await + .expect("failed to add job"); + + for job_id in &ids { + wait_for_job("job to run", &test.queue, *job_id).await; + } + + let jobs = test + .queue + .get_jobs_by_name("counter1".to_string(), 3) + .await + .unwrap() + .into_iter() + .map(|j| j.id) + .collect::>(); + + assert_eq!(jobs, vec![ids[0], ids[2]]); + + let jobs = test + .queue + .get_jobs_by_name("counter1".to_string(), 1) + .await + .unwrap() + .into_iter() + .map(|j| j.id) + .collect::>(); + + assert_eq!(jobs, vec![ids[0]]); + } #[tokio::test] async fn worker_gets_pending_jobs_when_starting() { diff --git a/effectum/src/migrations.rs b/effectum/src/migrations.rs index c07a206..cb1a0ac 100644 --- a/effectum/src/migrations.rs +++ b/effectum/src/migrations.rs @@ -3,9 +3,10 @@ use rusqlite_migration::{Migrations, M}; use crate::Result; -const MIGRATIONS: [&str; 2] = [ +const MIGRATIONS: [&str; 3] = [ include_str!("../migrations/00001-init.sql"), include_str!("../migrations/00002-rename-column.sql"), + include_str!("../migrations/00003-job-name-column.sql"), ]; fn create_migrations() -> Migrations<'static> { diff --git a/effectum/src/recurring.rs b/effectum/src/recurring.rs index 15c264e..ba17b8d 100644 --- a/effectum/src/recurring.rs +++ b/effectum/src/recurring.rs @@ -24,7 +24,7 @@ pub(crate) fn create_job_from_recurring_template( let query = r##"SELECT job_id, job_type, priority, weight, payload, max_retries, backoff_multiplier, backoff_randomization, backoff_initial_interval, - default_timeout, heartbeat_increment, schedule + default_timeout, heartbeat_increment, schedule, name FROM jobs JOIN recurring ON job_id = base_job_id WHERE status = 'recurring_base' AND job_id IN rarray(?) @@ -70,8 +70,15 @@ pub(crate) fn create_job_from_recurring_template( .map_err(|_| Error::InvalidSchedule) })?; + let name = row + .get_ref(12)? + .as_str_or_null() + .map_err(|e| Error::ColumnType(e.into(), "name"))? + .map(|s| s.to_string()); + let next_job_time = schedule.find_next_job_time(now, from_time)?; let job = JobBuilder::new(job_type) + .name_opt(name) .priority(priority) .weight(weight) .payload(payload) @@ -318,8 +325,14 @@ impl Queue { let schedule: RecurringJobSchedule = serde_json::from_str(&schedule).map_err(|_| Error::InvalidSchedule)?; - let base_job_info = - Self::run_job_status_query(db, crate::job_status::JobIdQuery::Id(base_job_id))?; + let base_job_info = Self::run_job_status_query( + db, + crate::job_status::JobIdQuery::Id(base_job_id), + 1, + )? + .into_iter() + .next() + .ok_or(Error::NotFound)?; let mut find_last_run_stmt = db.prepare_cached( r##"SELECT job_id @@ -334,10 +347,13 @@ impl Queue { .optional()?; let last_run = if let Some(last_run_id) = last_run_id { - Some(Self::run_job_status_query( + Self::run_job_status_query( db, crate::job_status::JobIdQuery::Id(last_run_id), - )?) + 1, + )? + .into_iter() + .next() } else { None }; @@ -415,6 +431,7 @@ mod tests { let test = TestEnvironment::new().await; let _worker = test.worker().build().await.expect("Failed to build worker"); let job = JobBuilder::new("counter") + .name("test") .json_payload(&serde_json::json!({ "value": 1 })) .expect("json_payload") .build(); @@ -486,6 +503,8 @@ mod tests { let result = wait_for_job("second run", &test.queue, second_job_id).await; assert!(result.started_at.expect("started_at") >= second_run_time); + assert_eq!(result.name, Some("test".to_string())); + assert_eq!( test.context .counter @@ -499,6 +518,7 @@ mod tests { let test = TestEnvironment::new().await; let _worker = test.worker().build().await.expect("Failed to build worker"); let job = JobBuilder::new("counter") + .name("testing") .json_payload(&serde_json::json!({ "value": 5 })) .expect("json_payload") .build(); @@ -588,6 +608,7 @@ mod tests { let result = wait_for_job("second run", &test.queue, second_job_id).await; assert!(result.started_at.expect("started_at") >= second_run_time); + assert_eq!(result.name, Some("testing".to_string())); assert_eq!( test.context .counter @@ -801,7 +822,7 @@ mod tests { async fn update_same_schedule() { let test = TestEnvironment::new().await; let _worker = test.worker().build().await.expect("Failed to build worker"); - let job = JobBuilder::new("counter") + let mut job = JobBuilder::new("counter") .json_payload(&serde_json::json!({ "value": 1 })) .expect("json_payload") .build(); @@ -825,6 +846,8 @@ mod tests { tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::resume(); + job.name = Some("new_name".to_string()); + test.queue .update_recurring_job("job_id".to_string(), schedule, job) .await @@ -837,6 +860,7 @@ mod tests { // The scheduled job and its time to run should be the same. assert_eq!(job_status.next_run, new_job_status.next_run); + assert_eq!(new_job_status.base_job.name, Some("new_name".to_string())); } #[tokio::test] @@ -1008,6 +1032,7 @@ mod tests { let test = TestEnvironment::new().await; let _worker = test.worker().build().await.expect("Failed to build worker"); let job = JobBuilder::new("counter") + .name("testing") .json_payload(&serde_json::json!({ "value": 1 })) .expect("json_payload") .build(); @@ -1078,6 +1103,7 @@ mod tests { let result = wait_for_job("second run", &test.queue, second_job_id).await; assert!(result.started_at.expect("started_at") >= second_run_time); + assert_eq!(result.name, Some("testing".to_string())); assert_eq!( test.context @@ -1112,6 +1138,7 @@ mod tests { assert!(job_status.next_run.is_some()); let new_job = JobBuilder::new("counter") + .name("testing") .json_payload(&serde_json::json!(2)) .expect("json_payload") .build(); @@ -1125,6 +1152,7 @@ mod tests { .await .expect("Retrieving job status"); let new_next_run = new_job_status.next_run.expect("next_run"); + assert_eq!(new_job_status.base_job.name, Some("testing".to_string())); // The scheduled job and its time to run should be the same. let payload: serde_json::Value = @@ -1132,7 +1160,7 @@ mod tests { assert_eq!(payload, serde_json::json!(2)); tokio::time::pause(); - wait_for_job("waiting for job to run", &test.queue, new_next_run.0).await; + let result = wait_for_job("waiting for job to run", &test.queue, new_next_run.0).await; assert_eq!( test.context .counter @@ -1140,6 +1168,7 @@ mod tests { 2, "task should have run with new payload" ); + assert_eq!(result.name, Some("testing".to_string())); } #[tokio::test] @@ -1230,7 +1259,7 @@ mod tests { .get_recurring_job_info("job_id".to_string()) .await .expect("Retrieving job status"); - let (first_job_id, first_run_at) = job_status.next_run.expect("next_run_at"); + let (first_job_id, _first_run_at) = job_status.next_run.expect("next_run_at"); wait_for_job("first run", &test.queue, first_job_id).await; assert_eq!(