From 4adc26c3d4894c0fd881d8483735af98ad47049e Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Tue, 10 Mar 2020 15:08:13 +1300 Subject: [PATCH 01/11] Duplicate worker code --- src/workerBatched.ts | 259 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 259 insertions(+) create mode 100644 src/workerBatched.ts diff --git a/src/workerBatched.ts b/src/workerBatched.ts new file mode 100644 index 00000000..c2ab07d5 --- /dev/null +++ b/src/workerBatched.ts @@ -0,0 +1,259 @@ +import * as assert from "assert"; +import { randomBytes } from "crypto"; + +import { defaults } from "./config"; +import deferred from "./deferred"; +import { makeJobHelpers } from "./helpers"; +import { + Job, + TaskList, + WithPgClient, + Worker, + WorkerOptions, +} from "./interfaces"; +import { processSharedOptions } from "./lib"; + +export function makeNewWorker( + options: WorkerOptions, + tasks: TaskList, + withPgClient: WithPgClient, + continuous = true, +): Worker { + const { + workerId = `worker-${randomBytes(9).toString("hex")}`, + pollInterval = defaults.pollInterval, + } = options; + const { + workerSchema, + escapedWorkerSchema, + logger, + maxContiguousErrors, + } = processSharedOptions(options, { + scope: { + label: "worker", + workerId, + }, + }); + const promise = deferred(); + let activeJob: Job | null = null; + + let doNextTimer: NodeJS.Timer | null = null; + const cancelDoNext = () => { + if (doNextTimer !== null) { + clearTimeout(doNextTimer); + doNextTimer = null; + return true; + } + return false; + }; + let active = true; + + const release = () => { + if (!active) { + return; + } + active = false; + if (cancelDoNext()) { + // Nothing in progress; resolve the promise + promise.resolve(); + } + return promise; + }; + + logger.debug(`Spawned`); + + let contiguousErrors = 0; + let again = false; + + const doNext = async (): Promise => { + again = false; + cancelDoNext(); + assert(active, "doNext called when active was false"); + assert(!activeJob, "There should be no active job"); + + // Find us a job + try { + const supportedTaskNames = Object.keys(tasks); + assert(supportedTaskNames.length, "No runnable tasks!"); + + const { + rows: [jobRow], + } = await withPgClient(client => + client.query({ + text: + // TODO: breaking change; change this to more optimal: + // `SELECT id, queue_name, task_identifier, payload FROM ${escapedWorkerSchema}.get_job($1, $2);`, + `SELECT * FROM ${escapedWorkerSchema}.get_job($1, $2);`, + values: [workerId, supportedTaskNames], + name: `get_job/${workerSchema}`, + }), + ); + + // `doNext` cannot be executed concurrently, so we know this is safe. + // eslint-disable-next-line require-atomic-updates + activeJob = jobRow && jobRow.id ? jobRow : null; + } catch (err) { + if (continuous) { + contiguousErrors++; + logger.debug( + `Failed to acquire job: ${err.message} (${contiguousErrors}/${maxContiguousErrors})`, + ); + if (contiguousErrors >= maxContiguousErrors) { + promise.reject( + new Error( + `Failed ${contiguousErrors} times in a row to acquire job; latest error: ${err.message}`, + ), + ); + release(); + return; + } else { + if (active) { + // Error occurred fetching a job; try again... + doNextTimer = setTimeout(() => doNext(), pollInterval); + } else { + promise.reject(err); + } + return; + } + } else { + promise.reject(err); + release(); + return; + } + } + contiguousErrors = 0; + + // If we didn't get a job, try again later (if appropriate) + if (!activeJob) { + if (continuous) { + if (active) { + if (again) { + // This could be a synchronisation issue where we were notified of + // the job but it's not visible yet, lets try again in just a + // moment. + doNext(); + } else { + doNextTimer = setTimeout(() => doNext(), pollInterval); + } + } else { + promise.resolve(); + } + } else { + promise.resolve(); + release(); + } + return; + } + + // We did get a job then; store it into the current scope. + const job = activeJob; + + // We may want to know if an error occurred or not + let err: Error | null = null; + try { + /* + * Be **VERY** careful about which parts of this code can throw - we + * **MUST** release the job once we've attempted it (success or error). + */ + const startTimestamp = process.hrtime(); + try { + logger.debug(`Found task ${job.id} (${job.task_identifier})`); + const task = tasks[job.task_identifier]; + assert(task, `Unsupported task '${job.task_identifier}'`); + const helpers = makeJobHelpers(options, job, { withPgClient, logger }); + await task(job.payload, helpers); + } catch (error) { + err = error; + } + const durationRaw = process.hrtime(startTimestamp); + const duration = durationRaw[0] * 1e3 + durationRaw[1] * 1e-6; + if (err) { + const { message, stack } = err; + logger.error( + `Failed task ${job.id} (${job.task_identifier}) with error ${ + err.message + } (${duration.toFixed(2)}ms)${ + stack + ? `:\n ${String(stack) + .replace(/\n/g, "\n ") + .trim()}` + : "" + }`, + { failure: true, job, error: err, duration }, + ); + // TODO: retry logic, in case of server connection interruption + await withPgClient(client => + client.query({ + text: `SELECT FROM ${escapedWorkerSchema}.fail_job($1, $2, $3);`, + values: [workerId, job.id, message], + name: `fail_job/${workerSchema}`, + }), + ); + } else { + if (!process.env.NO_LOG_SUCCESS) { + logger.info( + `Completed task ${job.id} (${ + job.task_identifier + }) with success (${duration.toFixed(2)}ms)`, + { job, duration, success: true }, + ); + } + // TODO: retry logic, in case of server connection interruption + await withPgClient(client => + client.query({ + text: `SELECT FROM ${escapedWorkerSchema}.complete_job($1, $2);`, + values: [workerId, job.id], + name: `complete_job/${workerSchema}`, + }), + ); + } + } catch (fatalError) { + const when = err ? `after failure '${err.message}'` : "after success"; + logger.error( + `Failed to release job '${job.id}' ${when}; committing seppuku\n${fatalError.message}`, + { fatalError, job }, + ); + promise.reject(fatalError); + release(); + return; + } finally { + // `doNext` cannot be executed concurrently, so we know this is safe. + // eslint-disable-next-line require-atomic-updates + activeJob = null; + } + if (active) { + doNext(); + } else { + promise.resolve(); + } + }; + + const nudge = () => { + assert(active, "nudge called after worker terminated"); + if (doNextTimer) { + // Must be idle; call early + doNext(); + return true; + } else { + again = true; + // Not idle; find someone else! + return false; + } + }; + + // Start! + doNext(); + + const worker = { + nudge, + workerId, + release, + promise, + getActiveJob: () => activeJob, + }; + + // For tests + promise["worker"] = worker; + + return worker; +} From 32f078733c9c511fb2e3e2a694394ad514127000 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Tue, 10 Mar 2020 17:07:33 +1300 Subject: [PATCH 02/11] Batched job processing POC --- sql/000005.sql | 109 ++++++++++++++++++++++++++++++++++ src/worker.ts | 158 ++++++++++++++++++++++++++++--------------------- 2 files changed, 201 insertions(+), 66 deletions(-) create mode 100644 sql/000005.sql diff --git a/sql/000005.sql b/sql/000005.sql new file mode 100644 index 00000000..2d64a55d --- /dev/null +++ b/sql/000005.sql @@ -0,0 +1,109 @@ +create or replace function :GRAPHILE_WORKER_SCHEMA.get_jobs( + worker_id text, + task_identifiers text[] = null, + job_expiry interval = interval '4 hours' +) returns setof :GRAPHILE_WORKER_SCHEMA.jobs as $$ +declare + v_jobs jsonb[]; + v_now timestamptz = now(); +begin + if worker_id is null or length(worker_id) < 10 then + raise exception 'invalid worker id'; + end if; + + select array( + select jsonb_build_object( + 'queue_name', + jobs.queue_name, + 'id', + jobs.id + ) + from :GRAPHILE_WORKER_SCHEMA.jobs + where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry)) + and ( + jobs.queue_name is null + or + exists ( + select 1 + from :GRAPHILE_WORKER_SCHEMA.job_queues + where job_queues.queue_name = jobs.queue_name + and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry)) + for update + skip locked + ) + ) + and run_at <= v_now + and attempts < max_attempts + and (task_identifiers is null or task_identifier = any(task_identifiers)) + order by priority asc, run_at asc, id asc + limit 10 + for update + skip locked + ) into v_jobs; + + -- if v_job_id is null then + -- return null; + -- end if; + + update :GRAPHILE_WORKER_SCHEMA.job_queues + set + locked_by = worker_id, + locked_at = v_now + where job_queues.queue_name = any(select (j->>'queue_name') from unnest(v_jobs) j); + + return query update :GRAPHILE_WORKER_SCHEMA.jobs + set + attempts = attempts + 1, + locked_by = worker_id, + locked_at = v_now + where id = any(select (j->>'id')::bigint from unnest(v_jobs) j) + returning *; +end; +$$ language plpgsql volatile; + + +create or replace function :GRAPHILE_WORKER_SCHEMA.complete_batch( + worker_id text, + success_ids bigint[], + failures json[] -- format {"id": string, "message": string} +) returns void as $$ +declare + v_row :GRAPHILE_WORKER_SCHEMA.jobs; +begin + with failures as ( + update :GRAPHILE_WORKER_SCHEMA.jobs + set + last_error = f->>'error_message', + run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, + locked_by = null, + locked_at = null + from unnest(failures) f + where id = (f->>'id')::bigint and locked_by = worker_id + returning queue_name + ) + update :GRAPHILE_WORKER_SCHEMA.job_queues jq + set locked_by = null, locked_at = null + where exists ( + select 1 + from unnest(failures) f + where jq.queue_name = f->>'queue_name' + ) and locked_by = worker_id; + + with successes as ( + delete from :GRAPHILE_WORKER_SCHEMA.jobs + where id = any(success_ids) + returning queue_name + ) + update :GRAPHILE_WORKER_SCHEMA.job_queues jq + set locked_by = null, locked_at = null + where exists ( + select 1 + from successes s + where jq.queue_name = s.queue_name + ) and locked_by = worker_id; + +end; +$$ language plpgsql volatile strict; + + + diff --git a/src/worker.ts b/src/worker.ts index c2ab07d5..5ee4b01c 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -35,7 +35,7 @@ export function makeNewWorker( }, }); const promise = deferred(); - let activeJob: Job | null = null; + let activeJobs: Job[] | null = null; let doNextTimer: NodeJS.Timer | null = null; const cancelDoNext = () => { @@ -69,21 +69,19 @@ export function makeNewWorker( again = false; cancelDoNext(); assert(active, "doNext called when active was false"); - assert(!activeJob, "There should be no active job"); + assert(!activeJobs, "There should be no active job"); // Find us a job try { const supportedTaskNames = Object.keys(tasks); assert(supportedTaskNames.length, "No runnable tasks!"); - const { - rows: [jobRow], - } = await withPgClient(client => + const { rows: jobRows } = await withPgClient(client => client.query({ text: // TODO: breaking change; change this to more optimal: // `SELECT id, queue_name, task_identifier, payload FROM ${escapedWorkerSchema}.get_job($1, $2);`, - `SELECT * FROM ${escapedWorkerSchema}.get_job($1, $2);`, + `SELECT * FROM ${escapedWorkerSchema}.get_jobs($1, $2);`, values: [workerId, supportedTaskNames], name: `get_job/${workerSchema}`, }), @@ -91,7 +89,15 @@ export function makeNewWorker( // `doNext` cannot be executed concurrently, so we know this is safe. // eslint-disable-next-line require-atomic-updates - activeJob = jobRow && jobRow.id ? jobRow : null; + const validJobs = jobRows.filter(r => r.id); + + if (validJobs.length > 1) { + console.log("MULTIPLE JOBS", validJobs); + // throw new Error("STOP"); + } else { + console.log("checked jobs", validJobs.length); + } + activeJobs = validJobs.length ? validJobs.filter(r => r.id) : null; } catch (err) { if (continuous) { contiguousErrors++; @@ -124,7 +130,7 @@ export function makeNewWorker( contiguousErrors = 0; // If we didn't get a job, try again later (if appropriate) - if (!activeJob) { + if (!activeJobs) { if (continuous) { if (active) { if (again) { @@ -146,72 +152,92 @@ export function makeNewWorker( } // We did get a job then; store it into the current scope. - const job = activeJob; + const jobs = activeJobs; // We may want to know if an error occurred or not let err: Error | null = null; try { - /* - * Be **VERY** careful about which parts of this code can throw - we - * **MUST** release the job once we've attempted it (success or error). - */ - const startTimestamp = process.hrtime(); - try { - logger.debug(`Found task ${job.id} (${job.task_identifier})`); - const task = tasks[job.task_identifier]; - assert(task, `Unsupported task '${job.task_identifier}'`); - const helpers = makeJobHelpers(options, job, { withPgClient, logger }); - await task(job.payload, helpers); - } catch (error) { - err = error; - } - const durationRaw = process.hrtime(startTimestamp); - const duration = durationRaw[0] * 1e3 + durationRaw[1] * 1e-6; - if (err) { - const { message, stack } = err; - logger.error( - `Failed task ${job.id} (${job.task_identifier}) with error ${ - err.message - } (${duration.toFixed(2)}ms)${ - stack - ? `:\n ${String(stack) - .replace(/\n/g, "\n ") - .trim()}` - : "" - }`, - { failure: true, job, error: err, duration }, - ); - // TODO: retry logic, in case of server connection interruption - await withPgClient(client => - client.query({ - text: `SELECT FROM ${escapedWorkerSchema}.fail_job($1, $2, $3);`, - values: [workerId, job.id, message], - name: `fail_job/${workerSchema}`, - }), - ); - } else { - if (!process.env.NO_LOG_SUCCESS) { - logger.info( - `Completed task ${job.id} (${ - job.task_identifier - }) with success (${duration.toFixed(2)}ms)`, - { job, duration, success: true }, + const successes: string[] = []; + const failures: { id: string; message: string }[] = []; + + for (const job of jobs) { + /* + * Be **VERY** careful about which parts of this code can throw - we + * **MUST** release the job once we've attempted it (success or error). + */ + const startTimestamp = process.hrtime(); + try { + logger.debug(`Found task ${job.id} (${job.task_identifier})`); + const task = tasks[job.task_identifier]; + assert(task, `Unsupported task '${job.task_identifier}'`); + const helpers = makeJobHelpers(options, job, { + withPgClient, + logger, + }); + await task(job.payload, helpers); + } catch (error) { + err = error; + } + const durationRaw = process.hrtime(startTimestamp); + const duration = durationRaw[0] * 1e3 + durationRaw[1] * 1e-6; + if (err) { + const { message, stack } = err; + logger.error( + `Failed task ${job.id} (${job.task_identifier}) with error ${ + err.message + } (${duration.toFixed(2)}ms)${ + stack + ? `:\n ${String(stack) + .replace(/\n/g, "\n ") + .trim()}` + : "" + }`, + { failure: true, job, error: err, duration }, ); + // TODO: retry logic, in case of server connection interruption + // await withPgClient(client => + // client.query({ + // text: `SELECT FROM ${escapedWorkerSchema}.fail_job($1, $2, $3);`, + // values: [workerId, job.id, message], + // name: `fail_job/${workerSchema}`, + // }), + // ); + failures.push({ ...job, message }); + } else { + if (!process.env.NO_LOG_SUCCESS) { + logger.info( + `Completed task ${job.id} (${ + job.task_identifier + }) with success (${duration.toFixed(2)}ms)`, + { job, duration, success: true }, + ); + } + // TODO: retry logic, in case of server connection interruption + // await withPgClient(client => + // client.query({ + // text: `SELECT FROM ${escapedWorkerSchema}.complete_job($1, $2);`, + // values: [workerId, job.id], + // name: `complete_job/${workerSchema}`, + // }), + // ); + successes.push(job.id); } - // TODO: retry logic, in case of server connection interruption - await withPgClient(client => - client.query({ - text: `SELECT FROM ${escapedWorkerSchema}.complete_job($1, $2);`, - values: [workerId, job.id], - name: `complete_job/${workerSchema}`, - }), - ); } + + await withPgClient(client => + client.query({ + text: `SELECT FROM ${escapedWorkerSchema}.complete_batch($1, $2, $3);`, + values: [workerId, successes, failures], + name: `complete_batch/${workerSchema}`, + }), + ); } catch (fatalError) { const when = err ? `after failure '${err.message}'` : "after success"; logger.error( - `Failed to release job '${job.id}' ${when}; committing seppuku\n${fatalError.message}`, - { fatalError, job }, + `Failed to release jobs ${jobs + .map(j => j.id) + .join(", ")} ${when}; shutting down\n${fatalError.message}`, + { fatalError, jobs }, ); promise.reject(fatalError); release(); @@ -219,7 +245,7 @@ export function makeNewWorker( } finally { // `doNext` cannot be executed concurrently, so we know this is safe. // eslint-disable-next-line require-atomic-updates - activeJob = null; + activeJobs = null; } if (active) { doNext(); @@ -249,7 +275,7 @@ export function makeNewWorker( workerId, release, promise, - getActiveJob: () => activeJob, + getActiveJob: () => null, // TODO }; // For tests From 04de4b5585822fb05574afd3be1519cdb15a2b56 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 16 Mar 2020 13:55:42 +1300 Subject: [PATCH 03/11] Return early if no jobs found (as before), update tests --- __tests__/migrate.test.ts | 2 +- sql/000005.sql | 37 +++++++++++++++++++------------------ src/worker.ts | 6 ------ 3 files changed, 20 insertions(+), 25 deletions(-) diff --git a/__tests__/migrate.test.ts b/__tests__/migrate.test.ts index edd25c1f..4a943ab2 100644 --- a/__tests__/migrate.test.ts +++ b/__tests__/migrate.test.ts @@ -33,7 +33,7 @@ test("migration installs schema; second migration does no harm", async () => { const { rows: migrationRows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`, ); - expect(migrationRows).toHaveLength(4); + expect(migrationRows).toHaveLength(5); const migration = migrationRows[0]; expect(migration.id).toEqual(1); diff --git a/sql/000005.sql b/sql/000005.sql index 2d64a55d..44ab5e0b 100644 --- a/sql/000005.sql +++ b/sql/000005.sql @@ -36,28 +36,29 @@ begin and attempts < max_attempts and (task_identifiers is null or task_identifier = any(task_identifiers)) order by priority asc, run_at asc, id asc - limit 10 + + -- TODO make this a configurable value + limit 1 + for update skip locked ) into v_jobs; - -- if v_job_id is null then - -- return null; - -- end if; - - update :GRAPHILE_WORKER_SCHEMA.job_queues - set - locked_by = worker_id, - locked_at = v_now - where job_queues.queue_name = any(select (j->>'queue_name') from unnest(v_jobs) j); + if array_length(v_jobs, 1) is not null then + update :GRAPHILE_WORKER_SCHEMA.job_queues + set + locked_by = worker_id, + locked_at = v_now + where job_queues.queue_name = any(select (j->>'queue_name') from unnest(v_jobs) j); - return query update :GRAPHILE_WORKER_SCHEMA.jobs - set - attempts = attempts + 1, - locked_by = worker_id, - locked_at = v_now - where id = any(select (j->>'id')::bigint from unnest(v_jobs) j) - returning *; + return query update :GRAPHILE_WORKER_SCHEMA.jobs + set + attempts = attempts + 1, + locked_by = worker_id, + locked_at = v_now + where id = any(select (j->>'id')::bigint from unnest(v_jobs) j) + returning *; + end if; end; $$ language plpgsql volatile; @@ -73,7 +74,7 @@ begin with failures as ( update :GRAPHILE_WORKER_SCHEMA.jobs set - last_error = f->>'error_message', + last_error = f->>'message', run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, locked_by = null, locked_at = null diff --git a/src/worker.ts b/src/worker.ts index 5ee4b01c..c9df4cec 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -91,12 +91,6 @@ export function makeNewWorker( // eslint-disable-next-line require-atomic-updates const validJobs = jobRows.filter(r => r.id); - if (validJobs.length > 1) { - console.log("MULTIPLE JOBS", validJobs); - // throw new Error("STOP"); - } else { - console.log("checked jobs", validJobs.length); - } activeJobs = validJobs.length ? validJobs.filter(r => r.id) : null; } catch (err) { if (continuous) { From ba6e034f75bff5f79f526cdd624ac32ae80112ed Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 16 Mar 2020 14:00:55 +1300 Subject: [PATCH 04/11] cleanup --- src/worker.ts | 19 +--- src/workerBatched.ts | 259 ------------------------------------------- 2 files changed, 3 insertions(+), 275 deletions(-) delete mode 100644 src/workerBatched.ts diff --git a/src/worker.ts b/src/worker.ts index c9df4cec..4bff0a0e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -188,14 +188,7 @@ export function makeNewWorker( }`, { failure: true, job, error: err, duration }, ); - // TODO: retry logic, in case of server connection interruption - // await withPgClient(client => - // client.query({ - // text: `SELECT FROM ${escapedWorkerSchema}.fail_job($1, $2, $3);`, - // values: [workerId, job.id, message], - // name: `fail_job/${workerSchema}`, - // }), - // ); + failures.push({ ...job, message }); } else { if (!process.env.NO_LOG_SUCCESS) { @@ -206,18 +199,12 @@ export function makeNewWorker( { job, duration, success: true }, ); } - // TODO: retry logic, in case of server connection interruption - // await withPgClient(client => - // client.query({ - // text: `SELECT FROM ${escapedWorkerSchema}.complete_job($1, $2);`, - // values: [workerId, job.id], - // name: `complete_job/${workerSchema}`, - // }), - // ); + successes.push(job.id); } } + // TODO: retry logic, in case of server connection interruption await withPgClient(client => client.query({ text: `SELECT FROM ${escapedWorkerSchema}.complete_batch($1, $2, $3);`, diff --git a/src/workerBatched.ts b/src/workerBatched.ts deleted file mode 100644 index c2ab07d5..00000000 --- a/src/workerBatched.ts +++ /dev/null @@ -1,259 +0,0 @@ -import * as assert from "assert"; -import { randomBytes } from "crypto"; - -import { defaults } from "./config"; -import deferred from "./deferred"; -import { makeJobHelpers } from "./helpers"; -import { - Job, - TaskList, - WithPgClient, - Worker, - WorkerOptions, -} from "./interfaces"; -import { processSharedOptions } from "./lib"; - -export function makeNewWorker( - options: WorkerOptions, - tasks: TaskList, - withPgClient: WithPgClient, - continuous = true, -): Worker { - const { - workerId = `worker-${randomBytes(9).toString("hex")}`, - pollInterval = defaults.pollInterval, - } = options; - const { - workerSchema, - escapedWorkerSchema, - logger, - maxContiguousErrors, - } = processSharedOptions(options, { - scope: { - label: "worker", - workerId, - }, - }); - const promise = deferred(); - let activeJob: Job | null = null; - - let doNextTimer: NodeJS.Timer | null = null; - const cancelDoNext = () => { - if (doNextTimer !== null) { - clearTimeout(doNextTimer); - doNextTimer = null; - return true; - } - return false; - }; - let active = true; - - const release = () => { - if (!active) { - return; - } - active = false; - if (cancelDoNext()) { - // Nothing in progress; resolve the promise - promise.resolve(); - } - return promise; - }; - - logger.debug(`Spawned`); - - let contiguousErrors = 0; - let again = false; - - const doNext = async (): Promise => { - again = false; - cancelDoNext(); - assert(active, "doNext called when active was false"); - assert(!activeJob, "There should be no active job"); - - // Find us a job - try { - const supportedTaskNames = Object.keys(tasks); - assert(supportedTaskNames.length, "No runnable tasks!"); - - const { - rows: [jobRow], - } = await withPgClient(client => - client.query({ - text: - // TODO: breaking change; change this to more optimal: - // `SELECT id, queue_name, task_identifier, payload FROM ${escapedWorkerSchema}.get_job($1, $2);`, - `SELECT * FROM ${escapedWorkerSchema}.get_job($1, $2);`, - values: [workerId, supportedTaskNames], - name: `get_job/${workerSchema}`, - }), - ); - - // `doNext` cannot be executed concurrently, so we know this is safe. - // eslint-disable-next-line require-atomic-updates - activeJob = jobRow && jobRow.id ? jobRow : null; - } catch (err) { - if (continuous) { - contiguousErrors++; - logger.debug( - `Failed to acquire job: ${err.message} (${contiguousErrors}/${maxContiguousErrors})`, - ); - if (contiguousErrors >= maxContiguousErrors) { - promise.reject( - new Error( - `Failed ${contiguousErrors} times in a row to acquire job; latest error: ${err.message}`, - ), - ); - release(); - return; - } else { - if (active) { - // Error occurred fetching a job; try again... - doNextTimer = setTimeout(() => doNext(), pollInterval); - } else { - promise.reject(err); - } - return; - } - } else { - promise.reject(err); - release(); - return; - } - } - contiguousErrors = 0; - - // If we didn't get a job, try again later (if appropriate) - if (!activeJob) { - if (continuous) { - if (active) { - if (again) { - // This could be a synchronisation issue where we were notified of - // the job but it's not visible yet, lets try again in just a - // moment. - doNext(); - } else { - doNextTimer = setTimeout(() => doNext(), pollInterval); - } - } else { - promise.resolve(); - } - } else { - promise.resolve(); - release(); - } - return; - } - - // We did get a job then; store it into the current scope. - const job = activeJob; - - // We may want to know if an error occurred or not - let err: Error | null = null; - try { - /* - * Be **VERY** careful about which parts of this code can throw - we - * **MUST** release the job once we've attempted it (success or error). - */ - const startTimestamp = process.hrtime(); - try { - logger.debug(`Found task ${job.id} (${job.task_identifier})`); - const task = tasks[job.task_identifier]; - assert(task, `Unsupported task '${job.task_identifier}'`); - const helpers = makeJobHelpers(options, job, { withPgClient, logger }); - await task(job.payload, helpers); - } catch (error) { - err = error; - } - const durationRaw = process.hrtime(startTimestamp); - const duration = durationRaw[0] * 1e3 + durationRaw[1] * 1e-6; - if (err) { - const { message, stack } = err; - logger.error( - `Failed task ${job.id} (${job.task_identifier}) with error ${ - err.message - } (${duration.toFixed(2)}ms)${ - stack - ? `:\n ${String(stack) - .replace(/\n/g, "\n ") - .trim()}` - : "" - }`, - { failure: true, job, error: err, duration }, - ); - // TODO: retry logic, in case of server connection interruption - await withPgClient(client => - client.query({ - text: `SELECT FROM ${escapedWorkerSchema}.fail_job($1, $2, $3);`, - values: [workerId, job.id, message], - name: `fail_job/${workerSchema}`, - }), - ); - } else { - if (!process.env.NO_LOG_SUCCESS) { - logger.info( - `Completed task ${job.id} (${ - job.task_identifier - }) with success (${duration.toFixed(2)}ms)`, - { job, duration, success: true }, - ); - } - // TODO: retry logic, in case of server connection interruption - await withPgClient(client => - client.query({ - text: `SELECT FROM ${escapedWorkerSchema}.complete_job($1, $2);`, - values: [workerId, job.id], - name: `complete_job/${workerSchema}`, - }), - ); - } - } catch (fatalError) { - const when = err ? `after failure '${err.message}'` : "after success"; - logger.error( - `Failed to release job '${job.id}' ${when}; committing seppuku\n${fatalError.message}`, - { fatalError, job }, - ); - promise.reject(fatalError); - release(); - return; - } finally { - // `doNext` cannot be executed concurrently, so we know this is safe. - // eslint-disable-next-line require-atomic-updates - activeJob = null; - } - if (active) { - doNext(); - } else { - promise.resolve(); - } - }; - - const nudge = () => { - assert(active, "nudge called after worker terminated"); - if (doNextTimer) { - // Must be idle; call early - doNext(); - return true; - } else { - again = true; - // Not idle; find someone else! - return false; - } - }; - - // Start! - doNext(); - - const worker = { - nudge, - workerId, - release, - promise, - getActiveJob: () => activeJob, - }; - - // For tests - promise["worker"] = worker; - - return worker; -} From 677956533e34ecf019a9a75f517fe4afc06afc35 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 16 Mar 2020 14:30:50 +1300 Subject: [PATCH 05/11] Fix dupe filter --- src/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker.ts b/src/worker.ts index 4bff0a0e..aff942e2 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -91,7 +91,7 @@ export function makeNewWorker( // eslint-disable-next-line require-atomic-updates const validJobs = jobRows.filter(r => r.id); - activeJobs = validJobs.length ? validJobs.filter(r => r.id) : null; + activeJobs = validJobs.length ? validJobs : null; } catch (err) { if (continuous) { contiguousErrors++; From 28bc3945b9702c1ee5a29e97bf838bb32bbc4af8 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 16 Mar 2020 14:45:52 +1300 Subject: [PATCH 06/11] optimise complete_batch function a little --- sql/000005.sql | 64 +++++++++++++++++++++++++++----------------------- src/worker.ts | 1 + 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/sql/000005.sql b/sql/000005.sql index 44ab5e0b..61648323 100644 --- a/sql/000005.sql +++ b/sql/000005.sql @@ -71,37 +71,41 @@ create or replace function :GRAPHILE_WORKER_SCHEMA.complete_batch( declare v_row :GRAPHILE_WORKER_SCHEMA.jobs; begin - with failures as ( - update :GRAPHILE_WORKER_SCHEMA.jobs - set - last_error = f->>'message', - run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, - locked_by = null, - locked_at = null - from unnest(failures) f - where id = (f->>'id')::bigint and locked_by = worker_id - returning queue_name - ) - update :GRAPHILE_WORKER_SCHEMA.job_queues jq - set locked_by = null, locked_at = null - where exists ( - select 1 - from unnest(failures) f - where jq.queue_name = f->>'queue_name' - ) and locked_by = worker_id; + if array_length(failures, 1) is not null then + with fail_jobs as ( + update :GRAPHILE_WORKER_SCHEMA.jobs + set + last_error = f->>'message', + run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, + locked_by = null, + locked_at = null + from unnest(failures) f + where id = (f->>'id')::bigint and locked_by = worker_id + returning queue_name + ) + update :GRAPHILE_WORKER_SCHEMA.job_queues jq + set locked_by = null, locked_at = null + where exists ( + select 1 + from fail_jobs f + where jq.queue_name = f.queue_name + ) and locked_by = worker_id; + end if; - with successes as ( - delete from :GRAPHILE_WORKER_SCHEMA.jobs - where id = any(success_ids) - returning queue_name - ) - update :GRAPHILE_WORKER_SCHEMA.job_queues jq - set locked_by = null, locked_at = null - where exists ( - select 1 - from successes s - where jq.queue_name = s.queue_name - ) and locked_by = worker_id; + if array_length(success_ids, 1) is not null then + with success_jobs as ( + delete from :GRAPHILE_WORKER_SCHEMA.jobs + where id = any(success_ids) + returning queue_name + ) + update :GRAPHILE_WORKER_SCHEMA.job_queues jq + set locked_by = null, locked_at = null + where exists ( + select 1 + from success_jobs s + where jq.queue_name = s.queue_name + ) and locked_by = worker_id; + end if; end; $$ language plpgsql volatile strict; diff --git a/src/worker.ts b/src/worker.ts index aff942e2..ced2eb38 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -154,6 +154,7 @@ export function makeNewWorker( const successes: string[] = []; const failures: { id: string; message: string }[] = []; + // TODO consider using Promise.all here to run jobs in parallel for (const job of jobs) { /* * Be **VERY** careful about which parts of this code can throw - we From 251c5f1d6539ed11d424d7cb3e2697cc414b1104 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 16 Mar 2020 14:51:41 +1300 Subject: [PATCH 07/11] Tiny optimisation --- src/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/worker.ts b/src/worker.ts index ced2eb38..584c51a2 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -190,7 +190,7 @@ export function makeNewWorker( { failure: true, job, error: err, duration }, ); - failures.push({ ...job, message }); + failures.push({ id: job.id, message }); } else { if (!process.env.NO_LOG_SUCCESS) { logger.info( From 863184cbfdceba4154c379073a3c9e3dc5709a7d Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Mon, 16 Mar 2020 16:16:34 +1300 Subject: [PATCH 08/11] optimise get_jobs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Master branch is getting ~2150 jobs/s for me The original batched get_jobs was performing really badly at ~680 jobs/s. Using a composite type instead of json internally increased that to ~840 Replacing the select array … into with a CTE got it right back up to ~1840 jobs/s at batch size 1. With batch size 4 that jumps to ~4400. --- sql/000005.sql | 48 ++++++++++++++++++++++++++---------------------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/sql/000005.sql b/sql/000005.sql index 61648323..00c5960f 100644 --- a/sql/000005.sql +++ b/sql/000005.sql @@ -4,21 +4,16 @@ create or replace function :GRAPHILE_WORKER_SCHEMA.get_jobs( job_expiry interval = interval '4 hours' ) returns setof :GRAPHILE_WORKER_SCHEMA.jobs as $$ declare - v_jobs jsonb[]; v_now timestamptz = now(); begin if worker_id is null or length(worker_id) < 10 then raise exception 'invalid worker id'; end if; - select array( - select jsonb_build_object( - 'queue_name', - jobs.queue_name, - 'id', - jobs.id - ) - from :GRAPHILE_WORKER_SCHEMA.jobs + return query with jobs_q as ( + select + id, queue_name + from :GRAPHILE_WORKER_SCHEMA.jobs jobs where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry)) and ( jobs.queue_name is null @@ -42,23 +37,29 @@ begin for update skip locked - ) into v_jobs; - - if array_length(v_jobs, 1) is not null then + ), + queues_q as ( update :GRAPHILE_WORKER_SCHEMA.job_queues set locked_by = worker_id, locked_at = v_now - where job_queues.queue_name = any(select (j->>'queue_name') from unnest(v_jobs) j); - - return query update :GRAPHILE_WORKER_SCHEMA.jobs - set - attempts = attempts + 1, - locked_by = worker_id, - locked_at = v_now - where id = any(select (j->>'id')::bigint from unnest(v_jobs) j) - returning *; - end if; + where exists( + select 1 + from jobs_q + where jobs_q.queue_name = job_queues.queue_name + ) + ) + update :GRAPHILE_WORKER_SCHEMA.jobs + set + attempts = attempts + 1, + locked_by = worker_id, + locked_at = v_now + where exists( + select 1 + from jobs_q + where jobs_q.id = :GRAPHILE_WORKER_SCHEMA.jobs.id + ) + returning *; end; $$ language plpgsql volatile; @@ -112,3 +113,6 @@ $$ language plpgsql volatile strict; + + + From 8f6c6f8f0c28d67a5218d9defb7f47e9289684e9 Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Tue, 22 Sep 2020 12:15:04 +1200 Subject: [PATCH 09/11] Pacify prettier --- src/worker.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/src/worker.ts b/src/worker.ts index 09768b75..98ac167e 100644 --- a/src/worker.ts +++ b/src/worker.ts @@ -92,7 +92,7 @@ export function makeNewWorker( } } - const { rows: jobRows } = await withPgClient(client => + const { rows: jobRows } = await withPgClient((client) => client.query({ text: // TODO: breaking change; change this to more optimal: @@ -109,7 +109,7 @@ export function makeNewWorker( // `doNext` cannot be executed concurrently, so we know this is safe. // eslint-disable-next-line require-atomic-updates - const validJobs = jobRows.filter(r => r.id); + const validJobs = jobRows.filter((r) => r.id); activeJobs = validJobs.length ? validJobs : null; } catch (err) { @@ -210,11 +210,7 @@ export function makeNewWorker( `Failed task ${job.id} (${ job.task_identifier }) with error ${message} (${duration.toFixed(2)}ms)${ - stack - ? `:\n ${String(stack) - .replace(/\n/g, "\n ") - .trim()}` - : "" + stack ? `:\n ${String(stack).replace(/\n/g, "\n ").trim()}` : "" }`, { failure: true, job, error: err, duration }, ); @@ -254,7 +250,7 @@ export function makeNewWorker( } // TODO: retry logic, in case of server connection interruption - await withPgClient(client => + await withPgClient((client) => client.query({ text: `SELECT FROM ${escapedWorkerSchema}.complete_batch($1, $2, $3);`, values: [workerId, successes, failures], @@ -265,7 +261,7 @@ export function makeNewWorker( const when = err ? `after failure '${err.message}'` : "after success"; logger.error( `Failed to release jobs ${jobs - .map(j => j.id) + .map((j) => j.id) .join(", ")} ${when}; shutting down\n${fatalError.message}`, { fatalError, jobs }, ); From c51f0882d392b9fba60f8462742a182fe4cc556c Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Tue, 22 Sep 2020 13:38:20 +1200 Subject: [PATCH 10/11] Migration bump --- __tests__/migrate.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/__tests__/migrate.test.ts b/__tests__/migrate.test.ts index c1d8bc0e..e525794c 100644 --- a/__tests__/migrate.test.ts +++ b/__tests__/migrate.test.ts @@ -33,7 +33,7 @@ test("migration installs schema; second migration does no harm", async () => { const { rows: migrationRows } = await pgClient.query( `select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`, ); - expect(migrationRows).toHaveLength(5); + expect(migrationRows).toHaveLength(6); const migration = migrationRows[0]; expect(migration.id).toEqual(1); From f51638fc4d7d9c45163f17b14f0e2a216d892e5f Mon Sep 17 00:00:00 2001 From: Greg Brown Date: Wed, 23 Sep 2020 11:11:21 +1200 Subject: [PATCH 11/11] Update db dump --- __tests__/schema.sql | 104 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/__tests__/schema.sql b/__tests__/schema.sql index d72b2f08..c27308a3 100644 --- a/__tests__/schema.sql +++ b/__tests__/schema.sql @@ -125,6 +125,49 @@ begin end; $$; ALTER FUNCTION graphile_worker.add_job(identifier text, payload json, queue_name text, run_at timestamp with time zone, max_attempts integer, job_key text, priority integer, flags text[]) OWNER TO graphile_worker_role; +CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) RETURNS void + LANGUAGE plpgsql STRICT + AS $$ +declare + v_row "graphile_worker".jobs; +begin + if array_length(failures, 1) is not null then + with fail_jobs as ( + update "graphile_worker".jobs + set + last_error = f->>'message', + run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, + locked_by = null, + locked_at = null + from unnest(failures) f + where id = (f->>'id')::bigint and locked_by = worker_id + returning queue_name + ) + update "graphile_worker".job_queues jq + set locked_by = null, locked_at = null + where exists ( + select 1 + from fail_jobs f + where jq.queue_name = f.queue_name + ) and locked_by = worker_id; + end if; + if array_length(success_ids, 1) is not null then + with success_jobs as ( + delete from "graphile_worker".jobs + where id = any(success_ids) + returning queue_name + ) + update "graphile_worker".job_queues jq + set locked_by = null, locked_at = null + where exists ( + select 1 + from success_jobs s + where jq.queue_name = s.queue_name + ) and locked_by = worker_id; + end if; +end; +$$; +ALTER FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) OWNER TO graphile_worker_role; CREATE FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) RETURNS graphile_worker.jobs LANGUAGE plpgsql AS $$ @@ -235,6 +278,67 @@ begin end; $$; ALTER FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[], job_expiry interval, forbidden_flags text[]) OWNER TO graphile_worker_role; +CREATE FUNCTION graphile_worker.get_jobs(worker_id text, task_identifiers text[] DEFAULT NULL::text[], job_expiry interval DEFAULT '04:00:00'::interval, forbidden_flags text[] DEFAULT NULL::text[]) RETURNS SETOF graphile_worker.jobs + LANGUAGE plpgsql + AS $$ +declare + v_now timestamptz = now(); +begin + if worker_id is null or length(worker_id) < 10 then + raise exception 'invalid worker id'; + end if; + return query with jobs_q as ( + select + id, queue_name + from "graphile_worker".jobs jobs + where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry)) + and ( + jobs.queue_name is null + or + exists ( + select 1 + from "graphile_worker".job_queues + where job_queues.queue_name = jobs.queue_name + and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry)) + for update + skip locked + ) + ) + and run_at <= v_now + and attempts < max_attempts + and (task_identifiers is null or task_identifier = any(task_identifiers)) + and (forbidden_flags is null or (flags ?| forbidden_flags) is not true) + order by priority asc, run_at asc, id asc + -- TODO make this a configurable value + limit 1 + for update + skip locked + ), + queues_q as ( + update "graphile_worker".job_queues + set + locked_by = worker_id, + locked_at = v_now + where exists( + select 1 + from jobs_q + where jobs_q.queue_name = job_queues.queue_name + ) + ) + update "graphile_worker".jobs + set + attempts = attempts + 1, + locked_by = worker_id, + locked_at = v_now + where exists( + select 1 + from jobs_q + where jobs_q.id = "graphile_worker".jobs.id + ) + returning *; +end; +$$; +ALTER FUNCTION graphile_worker.get_jobs(worker_id text, task_identifiers text[], job_expiry interval, forbidden_flags text[]) OWNER TO graphile_worker_role; CREATE FUNCTION graphile_worker.jobs__decrease_job_queue_count() RETURNS trigger LANGUAGE plpgsql AS $$