From d38f887891d2e4983758d3a11058f47ed91cc6f8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Fri, 2 Dec 2022 11:32:06 -0600 Subject: [PATCH 1/2] fix: queue graceful shutdown --- src/token-processor/queue/job-queue.ts | 21 +++++++++++---------- tests/job-queue.test.ts | 1 + 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/token-processor/queue/job-queue.ts b/src/token-processor/queue/job-queue.ts index 08f31fab..9bd58250 100644 --- a/src/token-processor/queue/job-queue.ts +++ b/src/token-processor/queue/job-queue.ts @@ -35,6 +35,7 @@ export class JobQueue { private readonly db: PgStore; /** IDs of jobs currently being processed by the queue. */ private jobIds: Set; + private isRunning = false; constructor(args: { db: PgStore }) { this.db = args.db; @@ -51,18 +52,19 @@ export class JobQueue { */ start() { console.log(`JobQueue starting queue...`); + this.isRunning = true; this.queue.start(); void this.runQueueLoop(); } /** - * Shuts down the queue by clearing it and waiting for its current work to be complete. + * Shuts down the queue and waits for its current work to be complete. */ async close() { console.log(`JobQueue closing, waiting on ${this.queue.pending} jobs to finish...`); - this.queue.clear(); - this.queue.pause(); + this.isRunning = false; await this.queue.onIdle(); + this.queue.pause(); } /** @@ -72,12 +74,11 @@ export class JobQueue { * @param job - A row from the `jobs` DB table that needs processing */ protected async add(job: DbJob): Promise { - if (this.queue.size + this.queue.pending >= ENV.JOB_QUEUE_SIZE_LIMIT) { - // To avoid memory errors, we won't add this job to the queue. It will be processed later when - // the empty queue gets replenished with pending jobs. - return; - } - if (this.jobIds.has(job.id)) { + if ( + !this.isRunning || + this.jobIds.has(job.id) || + this.queue.size + this.queue.pending >= ENV.JOB_QUEUE_SIZE_LIMIT + ) { return; } this.jobIds.add(job.id); @@ -124,7 +125,7 @@ export class JobQueue { * processing, repeating this cycle until the jobs table is completely processed. */ private async runQueueLoop() { - while (!this.queue.isPaused) { + while (this.isRunning) { try { const loadedJobs = await this.addJobBatch(); if (loadedJobs === 0) { diff --git a/tests/job-queue.test.ts b/tests/job-queue.test.ts index 2b1d0891..9ab69d92 100644 --- a/tests/job-queue.test.ts +++ b/tests/job-queue.test.ts @@ -9,6 +9,7 @@ class TestJobQueue extends JobQueue { return this.add(job); } async testAddJobBatch(): Promise { + this['isRunning'] = true; // Simulate a running queue. return this.addJobBatch(); } } From 332f3d4169ad0bf8a756166e703aabb3d9a36347 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rafael=20C=C3=A1rdenas?= Date: Fri, 2 Dec 2022 11:59:33 -0600 Subject: [PATCH 2/2] chore: improve test queue --- tests/job-queue.test.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tests/job-queue.test.ts b/tests/job-queue.test.ts index 9ab69d92..7d60044c 100644 --- a/tests/job-queue.test.ts +++ b/tests/job-queue.test.ts @@ -5,11 +5,14 @@ import { JobQueue } from '../src/token-processor/queue/job-queue'; import { cycleMigrations } from './helpers'; class TestJobQueue extends JobQueue { + constructor(args: { db: PgStore }) { + super(args); + this['isRunning'] = true; // Simulate a running queue. + } async testAdd(job: DbJob): Promise { return this.add(job); } async testAddJobBatch(): Promise { - this['isRunning'] = true; // Simulate a running queue. return this.addJobBatch(); } }