Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch job processing #470

Closed
wants to merge 20 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,13 @@ to make sure the system as a whole remains consistent.
Read more:
[Worker Pro Migration](https://worker.graphile.org/docs/pro/migration).

## Pending

- BREAKING: Jobs and queues are now `locked_by` their `WorkerPool`'s id rather
than the `workerId`. Be sure to upgrade
[Worker Pro](https://worker.graphile.org/docs/pro) at the same time if you're
using it!

## v0.16.6

- Fix bug in `workerUtils.cleanup()` where queues would not be cleaned up if
Expand Down
2 changes: 1 addition & 1 deletion __tests__/main.runTaskListOnce.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ test("runs jobs asynchronously", () =>
expect(q.job_count).toEqual(1);
expect(+q.locked_at).toBeGreaterThanOrEqual(+start);
expect(+q.locked_at).toBeLessThanOrEqual(+new Date());
expect(q.locked_by).toEqual(worker.workerId);
expect(q.locked_by).toEqual(worker.workerPool.id);
}

jobPromise!.resolve();
Expand Down
2 changes: 1 addition & 1 deletion __tests__/resetLockedAt.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ test("main will execute jobs as they come up, and exits cleanly", () =>
`\
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
set
locked_by = 'some_worker_id',
locked_by = 'some_pool_id',
locked_at = now() - (
case payload->>'id'
when 'locked_recently' then interval '5 minutes'
Expand Down
20 changes: 10 additions & 10 deletions __tests__/workerUtils.cleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,26 +71,26 @@ test("cleanup with GC_JOB_QUEUES", () =>
});

const jobs: Job[] = [];
const WORKER_ID_1 = "worker1";
const WORKER_ID_2 = "worker2";
const WORKER_ID_3 = "worker3";
const POOL_ID_1 = "pool-1";
const POOL_ID_2 = "pool-2";
const POOL_ID_3 = "pool-3";
let a = 0;
const date = new Date();
const specs = [
[WORKER_ID_1, "test", "test_job1"],
[WORKER_ID_2, "test2", "test_job2"],
[WORKER_ID_3, "test3", "test_job3"],
[POOL_ID_1, "test", "test_job1"],
[POOL_ID_2, "test2", "test_job2"],
[POOL_ID_3, "test3", "test_job3"],
[null, null, "test_job4"],
] as const;
for (const [workerId, queueName, taskIdentifier] of specs) {
for (const [poolId, queueName, taskIdentifier] of specs) {
date.setMinutes(date.getMinutes() - 1);
const job = await utils.addJob(
taskIdentifier,
{ a: ++a },
{ queueName: queueName ?? undefined },
);
jobs.push(job);
if (workerId) {
if (poolId) {
await pgClient.query(
`\
with j as (
Expand All @@ -107,7 +107,7 @@ with j as (
where job_queues.id = j.job_queue_id
)
select * from j`,
[date.toISOString(), workerId, job.id],
[date.toISOString(), poolId, job.id],
);
}
}
Expand All @@ -121,7 +121,7 @@ select * from j`,
"test3",
]);

await utils.forceUnlockWorkers(["worker3"]);
await utils.forceUnlockWorkers([POOL_ID_3]);
const thirdJob = jobs[2]; // Belongs to queueName 'task3'
await utils.completeJobs([thirdJob.id]);
await utils.cleanup({ tasks: ["GC_JOB_QUEUES"] });
Expand Down
28 changes: 14 additions & 14 deletions __tests__/workerUtils.forceUnlockWorkers.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,24 @@ test("unlocks jobs for the given workers, leaves others unaffected", () =>
});

const jobs: Job[] = [];
const WORKER_ID_1 = "worker1";
const WORKER_ID_2 = "worker2";
const WORKER_ID_3 = "worker3";
const POOL_ID_1 = "pool-1";
const POOL_ID_2 = "pool-2";
const POOL_ID_3 = "pool-3";
let a = 0;
const date = new Date();
const specs = [
[WORKER_ID_1, null],
[WORKER_ID_1, "test"],
[WORKER_ID_2, null],
[WORKER_ID_2, "test2"],
[WORKER_ID_2, "test3"],
[WORKER_ID_3, null],
[POOL_ID_1, null],
[POOL_ID_1, "test"],
[POOL_ID_2, null],
[POOL_ID_2, "test2"],
[POOL_ID_2, "test3"],
[POOL_ID_3, null],
[null, null],
[null, "test"],
[null, "test2"],
[null, "test3"],
] as const;
for (const [workerId, queueName] of specs) {
for (const [poolId, queueName] of specs) {
date.setMinutes(date.getMinutes() - 1);
const job = await utils.addJob(
"job3",
Expand All @@ -58,7 +58,7 @@ test("unlocks jobs for the given workers, leaves others unaffected", () =>
update ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
set locked_at = $1, locked_by = $2
where id = $3`,
[workerId ? date.toISOString() : null, workerId, job.id],
[poolId ? date.toISOString() : null, poolId, job.id],
);
jobs.push(job);
}
Expand All @@ -69,7 +69,7 @@ set locked_at = jobs.locked_at, locked_by = jobs.locked_by
from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}._private_jobs as jobs
where jobs.job_queue_id = job_queues.id;`,
);
await utils.forceUnlockWorkers([WORKER_ID_2, WORKER_ID_3]);
await utils.forceUnlockWorkers([POOL_ID_2, POOL_ID_3]);

const remaining = await getJobs(pgClient);
remaining.sort((a, z) => Number(a.id) - Number(z.id));
Expand All @@ -79,7 +79,7 @@ where jobs.job_queue_id = job_queues.id;`,
const job = jobs[i];
const updatedJob = remaining[i];
expect(updatedJob.id).toEqual(job.id);
if (spec[0] === WORKER_ID_2 || spec[0] === WORKER_ID_3) {
if (spec[0] === POOL_ID_2 || spec[0] === POOL_ID_3) {
expect(updatedJob.locked_by).toBeNull();
expect(updatedJob.locked_at).toBeNull();
} else if (spec[0]) {
Expand All @@ -97,7 +97,7 @@ where jobs.job_queue_id = job_queues.id;`,
expect(lockedQueues).toEqual([
expect.objectContaining({
queue_name: "test",
locked_by: WORKER_ID_1,
locked_by: POOL_ID_1,
}),
]);
}));
35 changes: 22 additions & 13 deletions perfTest/run.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,16 @@ async function main() {
console.log();
console.log();
console.log("Timing startup/shutdown time...");
const startupTime = await time(() => {
execSync("node ../dist/cli.js --once", execOptions);
let result;
const startupTime = await time(async () => {
result = await exec(
`node ../dist/cli.js --once --once -j ${CONCURRENCY} -m ${
CONCURRENCY + 1
}`,
execOptions,
);
});
logResult(result);
console.log();

if (STUCK_JOB_COUNT > 0) {
Expand Down Expand Up @@ -83,17 +90,7 @@ async function main() {
),
);
}
(await Promise.all(promises)).map(({ error, stdout, stderr }) => {
if (error) {
throw error;
}
if (stdout) {
console.log(stdout);
}
if (stderr) {
console.error(stderr);
}
});
(await Promise.all(promises)).map(logResult);
});
console.log(
`Jobs per second: ${((1000 * JOB_COUNT) / (dur - startupTime)).toFixed(2)}`,
Expand All @@ -112,3 +109,15 @@ main().catch((e) => {
console.error(e);
process.exit(1);
});

function logResult({ error, stdout, stderr }) {
if (error) {
throw error;
}
if (stdout) {
console.log(stdout);
}
if (stderr) {
console.error(stderr);
}
}
43 changes: 43 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,49 @@ declare global {
logger?: Logger;

events?: WorkerEvents;

/**
* To enable processing jobs in batches, set this to an integer larger
* than 1. This will result in jobs being fetched by the pool rather than
* the worker, the pool will fetch (and lock!) `getJobBatchSize` jobs up
* front, and each time a worker requests a job it will be served from
* this list until the list is exhausted, at which point a new set of
* jobs will be fetched (and locked).
*
* This setting can help reduce the load on your database from looking
* for jobs, but is only really effective when there are often many jobs
* queued and ready to go, and can increase the latency of job execution
* because a single worker may lock jobs into its queue leaving other
* workers idle.
*
* @default `-1`
*/
getJobBatchSize?: number;

/**
* The time in milliseconds to wait after a `completeJob` call to see if
* there are any other completeJob calls that can be batched together. A
* setting of `-1` disables this.
*
* Enabling this feature increases the time for which jobs are locked
* past completion, thus increasing the risk of catastrophic failure
* resulting in the jobs being executed again once they expire.
*
* @default `-1`
*/
completeJobBatchDelay?: number;

/**
* The time in milliseconds to wait after a `failJob` call to see if
* there are any other failJob calls that can be batched together. A
* setting of `-1` disables this.
*
* Enabling this feature increases the time for which jobs are locked
* past failure.
*
* @default `-1`
*/
failJobBatchDelay?: number;
}
interface Preset {
worker?: WorkerOptions;
Expand Down
22 changes: 22 additions & 0 deletions src/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ export interface WorkerPool {
/** @internal */
_shuttingDown: boolean;
/** @internal */
_forcefulShuttingDown: boolean;
/** @internal */
_active: boolean;
/** @internal */
_workers: Worker[];
Expand Down Expand Up @@ -851,6 +853,15 @@ export type WorkerEventMap = {
client: PoolClient;
};

/**
* When a worker pool fails to complete/fail a job
*/
"pool:fatalError": {
workerPool: WorkerPool;
error: unknown;
action: string;
};

/**
* When a worker pool is released
*/
Expand Down Expand Up @@ -1157,3 +1168,14 @@ export interface WorkerPluginContext {
hooks: AsyncHooks<GraphileConfig.WorkerHooks>;
resolvedPreset: ResolvedWorkerPreset;
}
export type GetJobFunction = (
workerId: string,
flagsToSkip: string[] | null,
) => Promise<Job | undefined>;

export type CompleteJobFunction = (job: DbJob) => void;
export type FailJobFunction = (
job: DbJob,
message: string,
replacementPayload: undefined | unknown[],
) => void;
Loading
Loading