Job key
When a job is added, you may opt to give it a "job key". Doing so will allow you to identify this job in future such that you can replace, update or remove it.
Be sure to read the job_key caveats below!
Replacing/updating jobs
Jobs scheduled with a job_key parameter may be replaced/updated by calling
add_job again with the same job_key value. This can be used for rescheduling
jobs, to ensure only one of a given job is scheduled at a time, or to update
other settings for the job.
For example after the below SQL transaction, the send_email job will run only
once, with the payload '{"count": 2}':
BEGIN;
SELECT graphile_worker.add_job('send_email', '{"count": 1}', job_key := 'abc');
SELECT graphile_worker.add_job('send_email', '{"count": 2}', job_key := 'abc');
COMMIT;
If both the previous and new versions of the job use array payloads they will be
merged; for example the process_invoices job will run once with the payload
[{id: 42},{id: 67}]:
BEGIN;
SELECT graphile_worker.add_job('process_invoices', '[{"id": 42}]', job_key := 'inv');
SELECT graphile_worker.add_job('process_invoices', '[{"id": 67}]', job_key := 'inv');
COMMIT;
In all cases if no match is found then a new job will be created.
job_key_mode
Behavior when an existing job with the same job key is found is controlled by
the job_key_mode setting:
replace(default) - overwrites the unlocked job with the new values (merging array payloads). This is primarily useful for rescheduling, updating, or debouncing (delaying execution until there have been no events for at least a certain time period). Locked jobs will cause a new job to be scheduled instead.preserve_run_at- overwrites the unlocked job with the new values (merging array payloads), but preservesrun_at. This is primarily useful for throttling (executing at most once over a given time period). Locked jobs will cause a new job to be scheduled instead.unsafe_dedupe- if an existing job is found, even if it is locked or permanently failed, then it won't be updated. This is very dangerous as it means that the event that triggered thisadd_jobcall may not result in any action. It is strongly advised you do not use this mode unless you are certain you know what you are doing.
The full job_key_mode algorithm is roughly as follows:
- If no existing job with the same job key is found:
- a new job will be created with the new attributes.
- Otherwise, if
job_key_modeisunsafe_dedupe:- stop and return the existing job.
- Otherwise, if the existing job is locked:
- it will have its
keycleared - it will have its attempts set to
max_attemptsto avoid it running again - a new job will be created with the new attributes.
- it will have its
- Otherwise, if the existing job has previously failed:
- it will have its
attemptsreset to 0 (as if it were newly scheduled) - it will have its
last_errorcleared - it will have all other attributes updated to their new values, including
run_at(even whenjob_key_modeispreserve_run_at).
- it will have its
- Otherwise, if
job_key_modeispreserve_run_at:- the job will have all its attributes except for
run_atupdated to their new values.
- the job will have all its attributes except for
- Otherwise:
- the job will have all its attributes updated to their new values.
Array payload merging
When updating an existing job via job_key (except in unsafe_dedupe mode), if
both the existing job's payload and the new payload are JSON arrays, they will
be concatenated rather than overwritten. This enables a batching pattern where
multiple events can be accumulated into a single job for more efficient
execution; see Handling batch jobs for more
info.
-- First call creates job with payload: [{"id": 1}]
SELECT graphile_worker.add_job(
'process_events',
'[{"id": 1}]'::json,
job_key := 'my_batch',
job_key_mode := 'preserve_run_at',
run_at := NOW() + INTERVAL '10 seconds'
);
-- Second call (before job runs) merges to: [{"id": 1}, {"id": 2}]
SELECT graphile_worker.add_job(
'process_events',
'[{"id": 2}]'::json,
job_key := 'my_batch',
job_key_mode := 'preserve_run_at',
run_at := NOW() + INTERVAL '10 seconds'
);
Combined with preserve_run_at job_key_mode, this creates a fixed batching
window: the job runs at the originally scheduled time with all accumulated
payloads merged together. With the default replace job_key_mode, each new
event would push the run_at forward, creating a rolling/debounce window
instead.
If either payload is not an array (e.g., one is an object, as is the default if no payload is specified), the standard replace behavior applies and the old payload will be lost.
Removing jobs
Pending jobs may also be removed using job_key:
SELECT graphile_worker.remove_job('abc');
job_key caveats
Jobs that complete successfully are deleted, there is no permanent job_key
log, i.e. remove_job on a completed job_key is a no-op as no row exists.
The job_key is treated as universally unique (whilst the job is
pending/failed), so you can update a job to have a completely different
task_identifier or payload. You must be careful to ensure that your
job_key is sufficiently unique to prevent you accidentally replacing or
deleting unrelated jobs by mistake; one way to approach this is to incorporate
the task_identifier into the job_key.
If a job is updated using add_job when it is currently locked (i.e. running),
a second job will be scheduled separately (unless
job_key_mode = 'unsafe_dedupe'), meaning both will run. (The old job will be
prevented from running again, and will have the job_key removed from it.)
Calling remove_job for a locked (i.e. running) job will not actually remove
it, but will prevent it from running again on failure.
There's currently a race condition in adding jobs with a job key which means
under very high contention of a specific key an add_job may fail and return
null. You should check for this null and handle it appropriately: retrying,
throwing an error, or however else makes sense to your code. See:
https://github.com/graphile/worker/issues/580 for more details.