Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC - batched queue processing #99

Closed
2 changes: 1 addition & 1 deletion __tests__/migrate.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test("migration installs schema; second migration does no harm", async () => {
const { rows: migrationRows } = await pgClient.query(
`select * from ${ESCAPED_GRAPHILE_WORKER_SCHEMA}.migrations`,
);
expect(migrationRows).toHaveLength(5);
expect(migrationRows).toHaveLength(6);
const migration = migrationRows[0];
expect(migration.id).toEqual(1);

Expand Down
104 changes: 104 additions & 0 deletions __tests__/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,49 @@ begin
end;
$$;
ALTER FUNCTION graphile_worker.add_job(identifier text, payload json, queue_name text, run_at timestamp with time zone, max_attempts integer, job_key text, priority integer, flags text[]) OWNER TO graphile_worker_role;
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) RETURNS void
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like the mixture of json and array here; I'd rather that the argument was just json and we used json_array_elements.

Suggested change
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) RETURNS void
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json) RETURNS void

However; I note a more troubling issue. PostgreSQL's JSON supports arbitrary precision numbers in JSON (I think); however Node is limited to IEEE754's 64bit floats, which gives ~53bits of integer safety, which is not enough to cover the bigint size of Graphile Worker's PKs. We can get around this by encoding the ID as a string, but we must be very careful to do so on the calling side always. I've not finished reading the code yet so haven't checked, but if there isn't already a note on this we should add one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be better to use a composite type here for failures?

create type graphile_worker.failure_info as (
  job_id bigint,
  failure_message text
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

failures json seems a bit ambiguous to me, because we will always have an array here. I think composite type makes sense, I'll have to double check what my original reasoning was for not doing that, but it might not have been anything in particular

LANGUAGE plpgsql STRICT
AS $$
declare
v_row "graphile_worker".jobs;
begin
if array_length(failures, 1) is not null then
with fail_jobs as (
update "graphile_worker".jobs
set
last_error = f->>'message',
run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval,
locked_by = null,
locked_at = null
from unnest(failures) f
where id = (f->>'id')::bigint and locked_by = worker_id
returning queue_name
)
update "graphile_worker".job_queues jq
set locked_by = null, locked_at = null
where exists (
select 1
from fail_jobs f
where jq.queue_name = f.queue_name
) and locked_by = worker_id;
end if;
if array_length(success_ids, 1) is not null then
with success_jobs as (
delete from "graphile_worker".jobs
where id = any(success_ids)
returning queue_name
)
update "graphile_worker".job_queues jq
set locked_by = null, locked_at = null
where exists (
select 1
from success_jobs s
where jq.queue_name = s.queue_name
) and locked_by = worker_id;
end if;
end;
$$;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you drop the if statements you could convert this to sql rather than plpgsql which might have an impact on performance. Maybe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a good idea anyway, always better to use plain sql if possible, imo

ALTER FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) OWNER TO graphile_worker_role;
CREATE FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) RETURNS graphile_worker.jobs
LANGUAGE plpgsql
AS $$
Expand Down Expand Up @@ -235,6 +278,67 @@ begin
end;
$$;
ALTER FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[], job_expiry interval, forbidden_flags text[]) OWNER TO graphile_worker_role;
CREATE FUNCTION graphile_worker.get_jobs(worker_id text, task_identifiers text[] DEFAULT NULL::text[], job_expiry interval DEFAULT '04:00:00'::interval, forbidden_flags text[] DEFAULT NULL::text[]) RETURNS SETOF graphile_worker.jobs
LANGUAGE plpgsql
AS $$
declare
v_now timestamptz = now();
begin
if worker_id is null or length(worker_id) < 10 then
raise exception 'invalid worker id';
end if;
return query with jobs_q as (
select
id, queue_name
from "graphile_worker".jobs jobs
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry))
and (
jobs.queue_name is null
or
exists (
select 1
from "graphile_worker".job_queues
where job_queues.queue_name = jobs.queue_name
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry))
for update
skip locked
)
)
and run_at <= v_now
and attempts < max_attempts
and (task_identifiers is null or task_identifier = any(task_identifiers))
and (forbidden_flags is null or (flags ?| forbidden_flags) is not true)
order by priority asc, run_at asc, id asc
-- TODO make this a configurable value
limit 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make the job_count the first argument to get_jobs.

Also maintaining both get_job and get_jobs is likely to be a pain; lets switch worker to using get_jobs (assuming no performance cost) and replace get_job(...) with a call to get_jobs(1, ...) for legacy support.

for update
skip locked
),
queues_q as (
update "graphile_worker".job_queues
set
locked_by = worker_id,
locked_at = v_now
where exists(
select 1
from jobs_q
where jobs_q.queue_name = job_queues.queue_name
)
)
update "graphile_worker".jobs
set
attempts = attempts + 1,
locked_by = worker_id,
locked_at = v_now
where exists(
select 1
from jobs_q
where jobs_q.id = "graphile_worker".jobs.id
)
returning *;
end;
$$;
ALTER FUNCTION graphile_worker.get_jobs(worker_id text, task_identifiers text[], job_expiry interval, forbidden_flags text[]) OWNER TO graphile_worker_role;
CREATE FUNCTION graphile_worker.jobs__decrease_job_queue_count() RETURNS trigger
LANGUAGE plpgsql
AS $$
Expand Down
114 changes: 114 additions & 0 deletions sql/000006.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
create or replace function :GRAPHILE_WORKER_SCHEMA.get_jobs(
worker_id text,
task_identifiers text[] = null,
job_expiry interval = interval '4 hours',
forbidden_flags text[] = null
) returns setof :GRAPHILE_WORKER_SCHEMA.jobs as $$
declare
v_now timestamptz = now();
begin
if worker_id is null or length(worker_id) < 10 then
raise exception 'invalid worker id';
end if;

return query with jobs_q as (
select
id, queue_name
from :GRAPHILE_WORKER_SCHEMA.jobs jobs
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry))
and (
jobs.queue_name is null
or
exists (
select 1
from :GRAPHILE_WORKER_SCHEMA.job_queues
where job_queues.queue_name = jobs.queue_name
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry))
for update
skip locked
)
)
and run_at <= v_now
and attempts < max_attempts
and (task_identifiers is null or task_identifier = any(task_identifiers))
and (forbidden_flags is null or (flags ?| forbidden_flags) is not true)
order by priority asc, run_at asc, id asc

-- TODO make this a configurable value
limit 1

for update
skip locked
),
queues_q as (
update :GRAPHILE_WORKER_SCHEMA.job_queues
set
locked_by = worker_id,
locked_at = v_now
where exists(
select 1
from jobs_q
where jobs_q.queue_name = job_queues.queue_name
)
)
update :GRAPHILE_WORKER_SCHEMA.jobs
set
attempts = attempts + 1,
locked_by = worker_id,
locked_at = v_now
where exists(
select 1
from jobs_q
where jobs_q.id = :GRAPHILE_WORKER_SCHEMA.jobs.id
)
returning *;
end;
$$ language plpgsql volatile;


create or replace function :GRAPHILE_WORKER_SCHEMA.complete_batch(
worker_id text,
success_ids bigint[],
failures json[] -- format {"id": string, "message": string}
) returns void as $$
declare
v_row :GRAPHILE_WORKER_SCHEMA.jobs;
begin
if array_length(failures, 1) is not null then
with fail_jobs as (
update :GRAPHILE_WORKER_SCHEMA.jobs
set
last_error = f->>'message',
run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval,
locked_by = null,
locked_at = null
from unnest(failures) f
where id = (f->>'id')::bigint and locked_by = worker_id
returning queue_name
)
update :GRAPHILE_WORKER_SCHEMA.job_queues jq
set locked_by = null, locked_at = null
where exists (
select 1
from fail_jobs f
where jq.queue_name = f.queue_name
) and locked_by = worker_id;
end if;

if array_length(success_ids, 1) is not null then
with success_jobs as (
delete from :GRAPHILE_WORKER_SCHEMA.jobs
where id = any(success_ids)
returning queue_name
)
update :GRAPHILE_WORKER_SCHEMA.job_queues jq
set locked_by = null, locked_at = null
where exists (
select 1
from success_jobs s
where jq.queue_name = s.queue_name
) and locked_by = worker_id;
end if;

end;
$$ language plpgsql volatile strict;
Loading