Skip to content

Commit

Permalink
feat: support delay and concurrency, graceful shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
adiologydev committed Aug 30, 2024
1 parent 181213b commit 132c392
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 5 deletions.
14 changes: 14 additions & 0 deletions apps/example/src/jobs/DelayedJob.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Job } from "tinyjobs";

export default class DelayedJob extends Job {
constructor() {
super({
name: "DelayedJob",
delay: 5000,
});
}

async run({ name }: { name: string }) {
console.log(`Hello from DelayedJob ${name}!`);
}
}
2 changes: 2 additions & 0 deletions apps/example/src/jobs/jobs.types.d.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
interface cronExampleParams { name: string };
interface DelayedJobParams { name: string };
interface exampleJobParams { name: string };

type TinyJobsTypes = {
'cronExample': cronExampleParams;
'DelayedJob': DelayedJobParams;
'exampleJob': exampleJobParams
};

Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
"scripts": {
"build": "turbo build",
"dev": "turbo dev",
"dev:packages": "turbo dev --filter=./packages/*",
"dev:apps": "turbo dev --filter=./apps/*",
"lint": "turbo lint",
"format": "prettier --write \"**/*.{ts,tsx,md}\"",
"ci:version": "bunx changeset version && bun install",
Expand Down
40 changes: 36 additions & 4 deletions packages/tinyjobs/src/lib/TinyJobsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,54 @@ type TinyJobsConstructorTypes = {
connection?: ConnectionOptions;
queueOptions?: JobsOptions;
queueName?: string;
concurrency?: number;
};

type JobsMap = Map<
string,
{
implementation: new () => TinyJob;
cron?: string;
delay?: number;
}
>;

class TinyJobs<T> {
private queue: Queue;
private worker: Worker;
private jobs: JobsMap = new Map();
private worker: Worker;

private readonly removeOnComplete = true;
private readonly removeOnFailure = true;

/**
* Creates an instance of TinyJobs.
* @param {ConnectionOptions} [connection] The connection options for the queue.
* @param {JobsOptions} [queueOptions] The options for the queue.
* @param {string} [queueName] The name of the queue.
* @param {number} [concurrency] The number of jobs to process concurrently.
* @memberof TinyJobs
*/
constructor(tinyJobsParams?: TinyJobsConstructorTypes) {
const {
connection,
queueOptions,
queueName = `tjq-${generateRandomUid()}`,
concurrency,
} = tinyJobsParams ?? {};

this.queue = new Queue(queueName, {
connection: connection ?? {},
...queueOptions,
});

this.worker = new Worker(queueName, this.processQueue.bind(this), {
connection: connection ?? {},
concurrency: concurrency ?? 1,
});

process.on("SIGINT", () => this.gracefulShutdown("SIGINT"));
process.on("SIGTERM", () => this.gracefulShutdown("SIGTERM"));
}

private async processQueue(job: BullJob) {
Expand All @@ -56,20 +75,25 @@ class TinyJobs<T> {
}

public async registerJob(job: new () => TinyJob) {
if (this.jobs.has(job.name)) {
if (this.jobs.has(job.name))
throw new Error(`Job with name ${job.name} already registered.`);
}

const implementation = new job();
this.jobs.set(job.name, {
implementation: job,
cron: implementation.cron,
delay: implementation.delay,
});

// Queue the job if it has a cron pattern so user doesn't have to do it manually
if (implementation.cron) {
await this.queue.add(implementation.name, undefined, {
repeat: {
pattern: implementation.cron,
},
removeOnComplete: this.removeOnComplete,
removeOnFail: this.removeOnFailure,
delay: implementation.delay,
});
}
}
Expand Down Expand Up @@ -102,17 +126,25 @@ class TinyJobs<T> {
options?: JobsOptions
) {
const job = this.jobs.get(jobName as string);
const cron = job?.cron;
const { cron, delay } = job ?? {};

return this.queue.add(jobName as string, data, {
repeat: cron
? {
pattern: cron,
}
: undefined,
delay: delay ?? undefined,
removeOnComplete: this.removeOnComplete,
removeOnFail: this.removeOnFailure,
...options,
});
}

private async gracefulShutdown(signal: NodeJS.Signals) {
await this.worker.close();
process.exit(signal === "SIGTERM" ? 0 : 1);
}
}

export default TinyJobs;
28 changes: 27 additions & 1 deletion packages/tinyjobs/src/structures/Job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,19 @@
class Job {
name: string;
cron?: string;
delay?: number;

constructor(options: { name: string; cron?: string }) {
/**
* Creates an instance of Job.
* @param {string} name The name of the job.
* @param {string} [cron] The cron pattern for the job.
* @param {number} [delay] The delay in milliseconds for the job.
* @memberof Job
*/
constructor(options: { name: string; cron?: string; delay?: number }) {
this.name = options.name;
this.cron = options.cron;
this.delay = options.delay;
}

async run(payload?: Record<string, unknown>) {
Expand All @@ -40,6 +49,23 @@ class Job {
static get cron(): string | undefined {
return this.cron;
}

static get delay(): number | undefined {
return this.delay;
}

static get concurrency(): number | undefined {
return this.concurrency;
}

static toJSON() {
return {
name: this.name,
cron: this.cron,
delay: this.delay,
concurrency: this.concurrency,
};
}
}

export default Job;

0 comments on commit 132c392

Please sign in to comment.