-
-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
POC - batched queue processing #99
Changes from all commits
4adc26c
32f0787
04de4b5
ba6e034
6779565
28bc394
251c5f1
863184c
3ace8a6
8f6c6f8
c51f088
f51638f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,6 +125,49 @@ begin | |
end; | ||
$$; | ||
ALTER FUNCTION graphile_worker.add_job(identifier text, payload json, queue_name text, run_at timestamp with time zone, max_attempts integer, job_key text, priority integer, flags text[]) OWNER TO graphile_worker_role; | ||
CREATE FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) RETURNS void | ||
LANGUAGE plpgsql STRICT | ||
AS $$ | ||
declare | ||
v_row "graphile_worker".jobs; | ||
begin | ||
if array_length(failures, 1) is not null then | ||
with fail_jobs as ( | ||
update "graphile_worker".jobs | ||
set | ||
last_error = f->>'message', | ||
run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, | ||
locked_by = null, | ||
locked_at = null | ||
from unnest(failures) f | ||
where id = (f->>'id')::bigint and locked_by = worker_id | ||
returning queue_name | ||
) | ||
update "graphile_worker".job_queues jq | ||
set locked_by = null, locked_at = null | ||
where exists ( | ||
select 1 | ||
from fail_jobs f | ||
where jq.queue_name = f.queue_name | ||
) and locked_by = worker_id; | ||
end if; | ||
if array_length(success_ids, 1) is not null then | ||
with success_jobs as ( | ||
delete from "graphile_worker".jobs | ||
where id = any(success_ids) | ||
returning queue_name | ||
) | ||
update "graphile_worker".job_queues jq | ||
set locked_by = null, locked_at = null | ||
where exists ( | ||
select 1 | ||
from success_jobs s | ||
where jq.queue_name = s.queue_name | ||
) and locked_by = worker_id; | ||
end if; | ||
end; | ||
$$; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you drop the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is a good idea anyway, always better to use plain sql if possible, imo |
||
ALTER FUNCTION graphile_worker.complete_batch(worker_id text, success_ids bigint[], failures json[]) OWNER TO graphile_worker_role; | ||
CREATE FUNCTION graphile_worker.complete_job(worker_id text, job_id bigint) RETURNS graphile_worker.jobs | ||
LANGUAGE plpgsql | ||
AS $$ | ||
|
@@ -235,6 +278,67 @@ begin | |
end; | ||
$$; | ||
ALTER FUNCTION graphile_worker.get_job(worker_id text, task_identifiers text[], job_expiry interval, forbidden_flags text[]) OWNER TO graphile_worker_role; | ||
CREATE FUNCTION graphile_worker.get_jobs(worker_id text, task_identifiers text[] DEFAULT NULL::text[], job_expiry interval DEFAULT '04:00:00'::interval, forbidden_flags text[] DEFAULT NULL::text[]) RETURNS SETOF graphile_worker.jobs | ||
LANGUAGE plpgsql | ||
AS $$ | ||
declare | ||
v_now timestamptz = now(); | ||
begin | ||
if worker_id is null or length(worker_id) < 10 then | ||
raise exception 'invalid worker id'; | ||
end if; | ||
return query with jobs_q as ( | ||
select | ||
id, queue_name | ||
from "graphile_worker".jobs jobs | ||
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry)) | ||
and ( | ||
jobs.queue_name is null | ||
or | ||
exists ( | ||
select 1 | ||
from "graphile_worker".job_queues | ||
where job_queues.queue_name = jobs.queue_name | ||
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry)) | ||
for update | ||
skip locked | ||
) | ||
) | ||
and run_at <= v_now | ||
and attempts < max_attempts | ||
and (task_identifiers is null or task_identifier = any(task_identifiers)) | ||
and (forbidden_flags is null or (flags ?| forbidden_flags) is not true) | ||
order by priority asc, run_at asc, id asc | ||
-- TODO make this a configurable value | ||
limit 1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Make the job_count the first argument to Also maintaining both get_job and get_jobs is likely to be a pain; lets switch worker to using |
||
for update | ||
skip locked | ||
), | ||
queues_q as ( | ||
update "graphile_worker".job_queues | ||
set | ||
locked_by = worker_id, | ||
locked_at = v_now | ||
where exists( | ||
select 1 | ||
from jobs_q | ||
where jobs_q.queue_name = job_queues.queue_name | ||
) | ||
) | ||
update "graphile_worker".jobs | ||
set | ||
attempts = attempts + 1, | ||
locked_by = worker_id, | ||
locked_at = v_now | ||
where exists( | ||
select 1 | ||
from jobs_q | ||
where jobs_q.id = "graphile_worker".jobs.id | ||
) | ||
returning *; | ||
end; | ||
$$; | ||
ALTER FUNCTION graphile_worker.get_jobs(worker_id text, task_identifiers text[], job_expiry interval, forbidden_flags text[]) OWNER TO graphile_worker_role; | ||
CREATE FUNCTION graphile_worker.jobs__decrease_job_queue_count() RETURNS trigger | ||
LANGUAGE plpgsql | ||
AS $$ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
create or replace function :GRAPHILE_WORKER_SCHEMA.get_jobs( | ||
worker_id text, | ||
task_identifiers text[] = null, | ||
job_expiry interval = interval '4 hours', | ||
forbidden_flags text[] = null | ||
) returns setof :GRAPHILE_WORKER_SCHEMA.jobs as $$ | ||
declare | ||
v_now timestamptz = now(); | ||
begin | ||
if worker_id is null or length(worker_id) < 10 then | ||
raise exception 'invalid worker id'; | ||
end if; | ||
|
||
return query with jobs_q as ( | ||
select | ||
id, queue_name | ||
from :GRAPHILE_WORKER_SCHEMA.jobs jobs | ||
where (jobs.locked_at is null or jobs.locked_at < (v_now - job_expiry)) | ||
and ( | ||
jobs.queue_name is null | ||
or | ||
exists ( | ||
select 1 | ||
from :GRAPHILE_WORKER_SCHEMA.job_queues | ||
where job_queues.queue_name = jobs.queue_name | ||
and (job_queues.locked_at is null or job_queues.locked_at < (v_now - job_expiry)) | ||
for update | ||
skip locked | ||
) | ||
) | ||
and run_at <= v_now | ||
and attempts < max_attempts | ||
and (task_identifiers is null or task_identifier = any(task_identifiers)) | ||
and (forbidden_flags is null or (flags ?| forbidden_flags) is not true) | ||
order by priority asc, run_at asc, id asc | ||
|
||
-- TODO make this a configurable value | ||
limit 1 | ||
|
||
for update | ||
skip locked | ||
), | ||
queues_q as ( | ||
update :GRAPHILE_WORKER_SCHEMA.job_queues | ||
set | ||
locked_by = worker_id, | ||
locked_at = v_now | ||
where exists( | ||
select 1 | ||
from jobs_q | ||
where jobs_q.queue_name = job_queues.queue_name | ||
) | ||
) | ||
update :GRAPHILE_WORKER_SCHEMA.jobs | ||
set | ||
attempts = attempts + 1, | ||
locked_by = worker_id, | ||
locked_at = v_now | ||
where exists( | ||
select 1 | ||
from jobs_q | ||
where jobs_q.id = :GRAPHILE_WORKER_SCHEMA.jobs.id | ||
) | ||
returning *; | ||
end; | ||
$$ language plpgsql volatile; | ||
|
||
|
||
create or replace function :GRAPHILE_WORKER_SCHEMA.complete_batch( | ||
worker_id text, | ||
success_ids bigint[], | ||
failures json[] -- format {"id": string, "message": string} | ||
) returns void as $$ | ||
declare | ||
v_row :GRAPHILE_WORKER_SCHEMA.jobs; | ||
begin | ||
if array_length(failures, 1) is not null then | ||
with fail_jobs as ( | ||
update :GRAPHILE_WORKER_SCHEMA.jobs | ||
set | ||
last_error = f->>'message', | ||
run_at = greatest(now(), run_at) + (exp(least(attempts, 10))::text || ' seconds')::interval, | ||
locked_by = null, | ||
locked_at = null | ||
from unnest(failures) f | ||
where id = (f->>'id')::bigint and locked_by = worker_id | ||
returning queue_name | ||
) | ||
update :GRAPHILE_WORKER_SCHEMA.job_queues jq | ||
set locked_by = null, locked_at = null | ||
where exists ( | ||
select 1 | ||
from fail_jobs f | ||
where jq.queue_name = f.queue_name | ||
) and locked_by = worker_id; | ||
end if; | ||
|
||
if array_length(success_ids, 1) is not null then | ||
with success_jobs as ( | ||
delete from :GRAPHILE_WORKER_SCHEMA.jobs | ||
where id = any(success_ids) | ||
returning queue_name | ||
) | ||
update :GRAPHILE_WORKER_SCHEMA.job_queues jq | ||
set locked_by = null, locked_at = null | ||
where exists ( | ||
select 1 | ||
from success_jobs s | ||
where jq.queue_name = s.queue_name | ||
) and locked_by = worker_id; | ||
end if; | ||
|
||
end; | ||
$$ language plpgsql volatile strict; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like the mixture of json and array here; I'd rather that the argument was just
json
and we usedjson_array_elements
.However; I note a more troubling issue. PostgreSQL's JSON supports arbitrary precision numbers in JSON (I think); however Node is limited to IEEE754's 64bit floats, which gives ~53bits of integer safety, which is not enough to cover the
bigint
size of Graphile Worker's PKs. We can get around this by encoding the ID as a string, but we must be very careful to do so on the calling side always. I've not finished reading the code yet so haven't checked, but if there isn't already a note on this we should add one.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be better to use a composite type here for failures?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failures json
seems a bit ambiguous to me, because we will always have an array here. I think composite type makes sense, I'll have to double check what my original reasoning was for not doing that, but it might not have been anything in particular