Skip to content

Commit

Permalink
Add a descriptive name field to the job (#15)
Browse files Browse the repository at this point in the history
* Add a descriptive name field to the job

This name does not have to be unique, and allows easier lookup of the
job or a specific set of jobs.

* changelog

* update name in recurring job data

* update changelog

* cleanup
  • Loading branch information
dimfeld authored Mar 28, 2024
1 parent 63ec703 commit 73e9c4e
Show file tree
Hide file tree
Showing 12 changed files with 239 additions and 101 deletions.
4 changes: 4 additions & 0 deletions effectum/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 0.6.0

- Jobs can now have a name, which can be used as an argument to `get_jobs_by_name`. The name does not have to be unique.

# 0.5.1

- Allow configuring whether to use the `Display` or `Debug` formatter to format failure information.
Expand Down
7 changes: 1 addition & 6 deletions effectum/examples/recurring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,13 @@
use std::{
path::Path,
sync::{
atomic::{AtomicI64, Ordering},
Arc, Mutex,
},
sync::{Arc, Mutex},
time::Duration,
};

use ahash::HashMap;
use effectum::{Job, JobRunner, Queue, RecurringJobSchedule, RunningJob};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use tracing::{event, Level};
use tracing_subscriber::{layer::SubscriberExt, EnvFilter};

#[derive(Debug)]
Expand Down
6 changes: 6 additions & 0 deletions effectum/migrations/00003-job-name-column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
ALTER TABLE jobs
ADD COLUMN name text;

CREATE INDEX jobs_name_added_at ON jobs (name, added_at DESC)
WHERE
name IS NOT NULL;
18 changes: 18 additions & 0 deletions effectum/src/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ pub struct Job {
pub id: Uuid,
/// The name of the job, which matches the name used in the [JobRunner](crate::JobRunner) for the job.
pub job_type: Cow<'static, str>,
/// A description for this job which can be passed to [Queue::get_jobs_by_name]. This value does not
/// have to be unique among all jobs.
pub name: Option<String>,
/// Jobs with higher `priority` will be executed first.
pub priority: i32,
/// Jobs that are expected to take more processing resources can be given a higher weight
Expand Down Expand Up @@ -98,6 +101,7 @@ impl Default for Job {
Self {
id: Uuid::now_v7(),
job_type: Default::default(),
name: None,
priority: 0,
weight: 1,
run_at: Default::default(),
Expand Down Expand Up @@ -126,6 +130,20 @@ impl JobBuilder {
}
}

/// Set the name of this job. This name is purely informational, and does not have to be unique. Jobs can be fetched by
/// their name using [Queue::get_jobs_by_name].
pub fn name(mut self, name: impl ToString) -> Self {
self.job.name = Some(name.to_string());
self
}

/// Set the name of this job. This name is purely informational, and does not have to be unique. Jobs can be fetched by
/// their name using [Queue::get_jobs_by_name].
pub fn name_opt(mut self, name: Option<String>) -> Self {
self.job.name = name.map(|n| n.to_string());
self
}

/// Set the priority of the job.
pub fn priority(mut self, priority: i32) -> Self {
self.job.priority = priority;
Expand Down
5 changes: 3 additions & 2 deletions effectum/src/db_writer/add_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ pub(crate) struct AddMultipleJobsArgs {

pub(super) const INSERT_JOBS_QUERY: &str = r##"
INSERT INTO jobs
(external_id, job_type, status, priority, weight, from_base_job, orig_run_at, payload,
(external_id, job_type, name, status, priority, weight, from_base_job, orig_run_at, payload,
max_retries, backoff_multiplier, backoff_randomization, backoff_initial_interval,
added_at, default_timeout, heartbeat_increment, run_info)
VALUES
($external_id, $job_type, $status, $priority, $weight, $from_base_job, $run_at, $payload,
($external_id, $job_type, $name, $status, $priority, $weight, $from_base_job, $run_at, $payload,
$max_retries, $backoff_multiplier, $backoff_randomization, $backoff_initial_interval,
$added_at, $default_timeout, $heartbeat_increment, '[]')
"##;
Expand All @@ -52,6 +52,7 @@ pub(super) fn execute_add_job_stmt(
jobs_stmt.execute(named_params! {
"$external_id": &job_config.id,
"$job_type": job_config.job_type,
"$name": job_config.name,
"$priority": job_config.priority,
"$weight": job_config.weight,
"$from_base_job": job_config.from_recurring,
Expand Down
7 changes: 6 additions & 1 deletion effectum/src/db_writer/ready_jobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ fn do_get_ready_jobs(
backoff_randomization,
backoff_initial_interval,
max_retries,
orig_run_at
orig_run_at,
jobs.name
FROM active_jobs
JOIN jobs USING(job_id)
WHERE active_worker_id IS NULL
Expand All @@ -66,6 +67,7 @@ fn do_get_ready_jobs(
struct JobResult {
job_id: i64,
external_id: Uuid,
name: Option<String>,
priority: i32,
weight: u16,
job_type: String,
Expand Down Expand Up @@ -103,6 +105,7 @@ fn do_get_ready_jobs(
let backoff_initial_interval: i32 = row.get(11)?;
let max_retries: i32 = row.get(12)?;
let orig_run_at: i64 = row.get(13)?;
let name: Option<String> = row.get(14)?;

Ok(JobResult {
job_id,
Expand All @@ -119,6 +122,7 @@ fn do_get_ready_jobs(
backoff_initial_interval,
max_retries,
orig_run_at,
name,
})
},
)?;
Expand Down Expand Up @@ -160,6 +164,7 @@ fn do_get_ready_jobs(
let job = RunningJob(Arc::new(RunningJobData {
id: job.external_id,
job_id: job.job_id,
name: job.name,
worker_id,
heartbeat_increment: job.heartbeat_increment,
job_type: job.job_type,
Expand Down
8 changes: 6 additions & 2 deletions effectum/src/db_writer/recurring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,8 @@ fn update_existing_recurring_job(
backoff_randomization = ?8,
backoff_initial_interval = ?9,
default_timeout = ?10,
heartbeat_increment = ?11
heartbeat_increment = ?11,
name = ?12
WHERE job_id=?1"##,
)?;
base_update_stmt.execute(params![
Expand All @@ -234,6 +235,7 @@ fn update_existing_recurring_job(
job.retries.backoff_initial_interval.as_secs(),
job.timeout.as_secs(),
job.heartbeat_increment.as_secs(),
job.name,
])?;

// Update any pending jobs
Expand All @@ -250,7 +252,8 @@ fn update_existing_recurring_job(
backoff_randomization = ?,
backoff_initial_interval = ?,
default_timeout = ?,
heartbeat_increment = ?
heartbeat_increment = ?,
name = ?
WHERE from_base_job = ? AND status = 'pending'
RETURNING job_id"##,
)?;
Expand All @@ -269,6 +272,7 @@ fn update_existing_recurring_job(
job.retries.backoff_initial_interval.as_secs(),
job.timeout.as_secs(),
job.heartbeat_increment.as_secs(),
job.name,
base_job_id,
],
|row| row.get::<_, rusqlite::types::Value>(0),
Expand Down
13 changes: 11 additions & 2 deletions effectum/src/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub struct RunningJobData {
/// The id of this job.
pub id: Uuid,
pub(crate) job_id: i64,
/// The name given to this job
pub name: Option<String>,
/// The ID of the [Worker](crate::worker::Worker) that is running this job.
pub worker_id: WorkerId,
/// How many seconds a heartbeat can extend the expiration time.
Expand Down Expand Up @@ -83,6 +85,7 @@ impl Debug for RunningJobData {
f.debug_struct("Job")
.field("id", &self.id)
.field("job_id", &self.job_id)
.field("name", &self.name)
.field("worker_id", &self.worker_id)
.field("heartbeat_increment", &self.heartbeat_increment)
.field("job_type", &self.job_type)
Expand Down Expand Up @@ -111,8 +114,14 @@ impl Display for RunningJobData {

write!(
f,
"Job {{ id: {}, job_type: {}, priority: {}, start_time: {}, expires: {}, try: {} }}",
self.id, self.job_type, self.priority, self.start_time, expires, self.current_try
"Job {{ id: {}, name: {}, job_type: {}, priority: {}, start_time: {}, expires: {}, try: {} }}",
self.id,
self.name.as_deref().unwrap_or("_"),
self.job_type,
self.priority,
self.start_time,
expires,
self.current_try
)
}
}
Expand Down
Loading

0 comments on commit 73e9c4e

Please sign in to comment.