diff --git a/src/token-processor/queue/job-queue.ts b/src/token-processor/queue/job-queue.ts index 08f31fab..9bd58250 100644 --- a/src/token-processor/queue/job-queue.ts +++ b/src/token-processor/queue/job-queue.ts @@ -35,6 +35,7 @@ export class JobQueue { private readonly db: PgStore; /** IDs of jobs currently being processed by the queue. */ private jobIds: Set; + private isRunning = false; constructor(args: { db: PgStore }) { this.db = args.db; @@ -51,18 +52,19 @@ export class JobQueue { */ start() { console.log(`JobQueue starting queue...`); + this.isRunning = true; this.queue.start(); void this.runQueueLoop(); } /** - * Shuts down the queue by clearing it and waiting for its current work to be complete. + * Shuts down the queue and waits for its current work to be complete. */ async close() { console.log(`JobQueue closing, waiting on ${this.queue.pending} jobs to finish...`); - this.queue.clear(); - this.queue.pause(); + this.isRunning = false; await this.queue.onIdle(); + this.queue.pause(); } /** @@ -72,12 +74,11 @@ export class JobQueue { * @param job - A row from the `jobs` DB table that needs processing */ protected async add(job: DbJob): Promise { - if (this.queue.size + this.queue.pending >= ENV.JOB_QUEUE_SIZE_LIMIT) { - // To avoid memory errors, we won't add this job to the queue. It will be processed later when - // the empty queue gets replenished with pending jobs. - return; - } - if (this.jobIds.has(job.id)) { + if ( + !this.isRunning || + this.jobIds.has(job.id) || + this.queue.size + this.queue.pending >= ENV.JOB_QUEUE_SIZE_LIMIT + ) { return; } this.jobIds.add(job.id); @@ -124,7 +125,7 @@ export class JobQueue { * processing, repeating this cycle until the jobs table is completely processed. */ private async runQueueLoop() { - while (!this.queue.isPaused) { + while (this.isRunning) { try { const loadedJobs = await this.addJobBatch(); if (loadedJobs === 0) { diff --git a/tests/job-queue.test.ts b/tests/job-queue.test.ts index 2b1d0891..7d60044c 100644 --- a/tests/job-queue.test.ts +++ b/tests/job-queue.test.ts @@ -5,6 +5,10 @@ import { JobQueue } from '../src/token-processor/queue/job-queue'; import { cycleMigrations } from './helpers'; class TestJobQueue extends JobQueue { + constructor(args: { db: PgStore }) { + super(args); + this['isRunning'] = true; // Simulate a running queue. + } async testAdd(job: DbJob): Promise { return this.add(job); }