Skip to content

Commit

Permalink
Rename RETRYING -> RETRY and retrying_timeout to retry_timeout
Browse files Browse the repository at this point in the history
 - Add documentation to the README for new retry functionality

 - on job execution failure, move directly to the FAILED state if
   max attempts have already been used.

 - Remove IO.inspect from test job peform

 - Document requirement for `perform` callback to have the same return
   type as `Repo.transaction/1`
  • Loading branch information
mbuhot committed Oct 24, 2019
1 parent 3f33787 commit 9aa3aa1
Show file tree
Hide file tree
Showing 9 changed files with 117 additions and 125 deletions.
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,12 +202,19 @@ Control the time for which the job is reserved while waiting for a worker to pic
config :ecto_job, :reservation_timeout, 15_000
```

Control the timeout for job execution before a job will be made available for retry. Begins when job is picked up by worker. Keep in mind, for jobs that are expected to retry quickly, any configured `execution_timeout` will only retry a job as quickly as the `poll_interval`. The default is `300_000` ms (5 mins).
Control the delay between retries following a job execution failure. Keep in mind, for jobs that are expected to retry quickly, any configured `retry_timeout` will only retry a job as quickly as the `poll_interval`. The default is `30_000` ms (30 seconds).

```
config :ecto_job, :retry_timeout, 30_000
```

Control the timeout for job execution before an "IN_PROGRESS" job is assumed to have failed. Begins when job is picked up by worker. Similarly to `retry_timeout`, any configured `execution_timeout` will only retry a job as quickly as the `poll_interval`. The default is `300_000` ms (5 mins).

```
config :ecto_job, :execution_timeout, 300_000
```


You can control whether logs are on or off and the log level. The default is `true` and `:info`.

```
Expand Down Expand Up @@ -252,7 +259,8 @@ Once a consumer is given a job, it increments the attempt counter and updates th
If the job is being retried, the expiry will be initial timeout * the attempt counter.

If successful, the consumer can delete the job from the queue using the preloaded multi passed to the `perform/2` job handler.
If an exception is raised in the worker or a successful processing attempt fails to successfully commit the preloaded multi, the job is not deleted and remains in the "IN_PROGRESS" state until it expires.
If an exception is raised in the worker or a successful processing attempt fails to successfully commit the preloaded multi, the job is transitioned to the "RETRY" state, scheduled to run again after `retry_timeout` * attempt counter.
If the processes is killed or is otherwise unable to transition to "RETRY", it will remain in "IN_PROGRESS" until the `execution_timeout` expires.

Jobs in the "RESERVED" or "IN_PROGRESS" state past the expiry time will be returned to the "AVAILABLE" state.

Expand Down
2 changes: 1 addition & 1 deletion examples/ecto_job_demo/config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ config :ecto_job_demo, ecto_repos: [EctoJobDemo.Repo]
config :ecto_job_demo, EctoJobDemo.Repo,
adapter: Ecto.Adapters.Postgres,
username: "postgres",
password: "postgres",
password: "password",
database: "ecto_job_demo",
hostname: "localhost",
pool_size: 10
Expand Down
6 changes: 3 additions & 3 deletions lib/ecto_job/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ defmodule EctoJob.Config do
- `poll_interval`: (Default `60_000`) Time in milliseconds between polling the `JobQueue` for scheduled jobs or jobs due to be retried
- `reservation_timeout`: (Default `60_000`) Time in ms during which a `RESERVED` job state is held while waiting for a worker to start the job. Subsequent polls will return the job to the `AVAILABLE` state for retry.
- `execution_timeout`: (Default `300_000`) Time in ms that a worker is allotted to hold a job in the `IN_PROGRESS` state before subsequent polls return a job to the `AVAILABLE` state for retry. The timeout is extended by `execution_timeout` for every retry attempt until `max_attemps` is reached for a given job.
- `retrying_timeout`: (Default `300_000`) Time in ms that a worker is allotted to hold a job in the `RETRYING` state before subsequent polls return a job to the `AVAILABLE` state for retry.
- `retry_timeout`: (Default `30_000`) Time in ms that a job will stay in the `RETRY` state before subsequent polls return a job to the `AVAILABLE` state for retry. The timeout is extended by `retry_timeout` for every retry attempt until `max_attemps` is reached for a given job.
- `notifications_listen_timeout`: (Default `5_000`) Time in milliseconds that Notifications.listen!/3 is alloted to start listening to notifications from postgrex for new jobs
"""

Expand All @@ -37,7 +37,7 @@ defmodule EctoJob.Config do
log_level: :info,
poll_interval: 60_000,
reservation_timeout: 60_000,
retrying_timeout: 300_000,
retry_timeout: 30_000,
execution_timeout: 300_000,
notifications_listen_timeout: 5_000

Expand All @@ -56,7 +56,7 @@ defmodule EctoJob.Config do
log_level: :info,
poll_interval: 60_000,
reservation_timeout: 60_000,
retrying_timeout: 300_000,
retry_timeout: 30_000,
execution_timeout: 300_000,
notifications_listen_timeout: 5_000
}
Expand Down
66 changes: 26 additions & 40 deletions lib/ecto_job/job_queue.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule EctoJob.JobQueue do
- `"AVAILABLE"`: The job is availble to be run by the next available worker
- `"RESERVED"`: The job has been reserved by a worker for execution
- `"IN_PROGRESS"`: The job is currently being worked
- `"RETRYING"`: The job has failed and it's waiting for a retry
- `"RETRY"`: The job has failed and it's waiting for a retry
- `"FAILED"`: The job has exceeded the `max_attempts` and will not be retried again
"""
@type state :: String.t()
Expand All @@ -63,13 +63,18 @@ defmodule EctoJob.JobQueue do
@doc """
Job execution callback to be implemented by each `JobQueue` module.
The return type is the same as `Ecto.Repo.transaction/1`.
## Example
@impl true
def perform(multi, params = %{"type" => "new_user"}), do: NewUser.perform(multi, params)
def perform(multi, params = %{"type" => "sync_crm"}), do: SyncCRM.perform(multi, params)
"""
@callback perform(multi :: Multi.t(), params :: map) :: any()
@callback perform(multi :: Multi.t(), params :: map) ::
{:ok, any()}
| {:error, any()}
| {:error, Ecto.Multi.name(), any(), %{required(Ecto.Multi.name()) => any()}}

defmacro __using__(opts) do
table_name = Keyword.fetch!(opts, :table_name)
Expand Down Expand Up @@ -208,7 +213,7 @@ defmodule EctoJob.JobQueue do
end

@doc """
Updates all jobs in the `"SCHEDULED"` and `"RETRYING"` state with scheduled time <= now to `"AVAILABLE"` state.
Updates all jobs in the `"SCHEDULED"` and `"RETRY"` state with scheduled time <= now to `"AVAILABLE"` state.
Returns the number of jobs updated.
"""
Expand All @@ -218,7 +223,7 @@ defmodule EctoJob.JobQueue do
repo.update_all(
Query.from(
job in schema,
where: job.state in ["SCHEDULED", "RETRYING"],
where: job.state in ["SCHEDULED", "RETRY"],
where: job.schedule <= ^now
),
set: [state: "AVAILABLE", updated_at: now]
Expand Down Expand Up @@ -269,26 +274,6 @@ defmodule EctoJob.JobQueue do
count
end

@doc """
Updates all RETRYING jobs that have been attempted the maximum number of times to `"FAILED"`.
Returns the number of jobs updated.
"""
@spec fail_retrying_jobs_at_max_attempts(repo, schema, DateTime.t()) :: integer
def fail_retrying_jobs_at_max_attempts(repo, schema, now = %DateTime{}) do
{count, _} =
repo.update_all(
Query.from(
job in schema,
where: job.state in ["RETRYING"],
where: job.attempt >= job.max_attempts
),
set: [state: "FAILED", expires: nil, updated_at: now]
)

count
end

@doc """
Updates a batch of jobs in `"AVAILABLE"` state to `"RESERVED"` state with a timeout.
Expand Down Expand Up @@ -372,40 +357,41 @@ defmodule EctoJob.JobQueue do
end

@doc """
Transitions a job from `"IN_PROGRESS"` to `"RETRYING"`.
Updates the state to `"RETRYING"` and changes the schedule time to
differentiate an expired job from one that had an exception or an error.
Transitions a job from `"IN_PROGRESS"` to `"RETRY" or "FAILED" after execution failure.
If the job has exceeded the configured `max_attempts` the state will move to "FAILED",
otherwise the state is transitioned to `"RETRY"` and changes the schedule time so the
job will be picked up again.
"""
@spec update_job_to_retrying(repo, job, DateTime.t(), integer) ::
{:ok, Ecto.Schema.t()} | {:error, String.t}
def update_job_to_retrying(repo, job = %schema{}, now, timeout_ms) do
@spec job_failed(repo, job, DateTime.t(), integer) :: {:ok, job} | :error
def job_failed(repo, job = %schema{}, now, retry_timeout_ms) do
updates =
if job.attempt >= job.max_attempts do
[state: "FAILED", expires: nil]
else
[state: "RETRY", schedule: increase_time(now, job.attempt + 1, retry_timeout_ms)]
end

{count, results} =
repo.update_all(
Query.from(
j in schema,
where: j.id == ^job.id,
where: j.state == "IN_PROGRESS",
where: j.attempt == ^job.attempt,
select: j
),
[
set: [
state: "RETRYING",
schedule: increase_time(now, job.attempt + 1, timeout_ms),
updated_at: now
]
]
set: updates
)

case {count, results} do
{0, _} -> {:error, :wrong_state_when_retrying}
{0, _} -> :error
{1, [job]} -> {:ok, job}
end
end

@doc """
Computes the expiry time for an `"IN_PROGRESS"` and schedule time of "RETRYING" jobs based on the current time and attempt counter.
Computes the expiry time for an `"IN_PROGRESS"` and schedule time of "RETRY" jobs based on the current time and attempt counter.
"""
@spec increase_time(DateTime.t(), integer, integer) :: DateTime.t()
def increase_time(now = %DateTime{}, attempt, timeout_ms) do
Expand Down
1 change: 0 additions & 1 deletion lib/ecto_job/producer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ defmodule EctoJob.Producer do
def handle_info(:poll, state = %State{repo: repo, schema: schema, clock: clock}) do
now = clock.()
_ = JobQueue.fail_expired_jobs_at_max_attempts(repo, schema, now)
_ = JobQueue.fail_retrying_jobs_at_max_attempts(repo, schema, now)
activate_jobs(repo, schema, now)
dispatch_jobs(state, now)
end
Expand Down
57 changes: 29 additions & 28 deletions lib/ecto_job/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,63 +10,64 @@ defmodule EctoJob.Worker do
@doc """
Equivalent to `start_link(config, job, DateTime.utc_now())`
"""
@spec start_link(Config.t, EctoJob.JobQueue.job()) :: {:ok, pid}
@spec start_link(Config.t(), EctoJob.JobQueue.job()) :: {:ok, pid}
def start_link(config, job), do: start_link(config, job, DateTime.utc_now())

@doc """
Start a worker process given a repo module and a job struct
This may fail if the job reservation has expired, in which case the job will be
reactivated by the producer.
"""
@spec start_link(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: {:ok, pid}
@spec start_link(Config.t(), EctoJob.JobQueue.job(), DateTime.t()) :: {:ok, pid}
def start_link(config, job, now) do
Task.start_link(fn -> do_work(config, job, now) end)
end

@spec do_work(Config.t, EctoJob.JobQueue.job(), DateTime.t()) ::
:ok | {:ok, EctoJob.JobQueue.job()} | {:error, any()}
def do_work(config = %Config{repo: repo,
execution_timeout: exec_timeout,
retrying_timeout: retrying_timeout},
job,
now) do
@doc false
@spec do_work(Config.t(), EctoJob.JobQueue.job(), DateTime.t()) :: :ok | {:error, any()}
def do_work(config = %Config{repo: repo, execution_timeout: exec_timeout}, job, now) do
with {:ok, in_progress_job} <- JobQueue.update_job_in_progress(repo, job, now, exec_timeout),
response <- run_queue(config, in_progress_job),
true <- valid?(response) do
:ok <- run_job(config, in_progress_job) do
log_duration(config, in_progress_job, now)
notify_completed(repo, in_progress_job)
else
false -> JobQueue.update_job_to_retrying(repo, job, DateTime.utc_now(), retrying_timeout)
error -> error
{:error, reason} -> {:error, reason}
end
end

@spec run_queue(Config.t, EctoJob.JobQueue.job()) :: any()
defp run_queue(%Config{repo: repo, retrying_timeout: timeout}, job = %queue{}) do
@spec run_job(Config.t(), EctoJob.JobQueue.job()) :: :ok
defp run_job(%Config{repo: repo, retry_timeout: timeout}, job = %queue{}) do
job_failed = fn ->
_ = JobQueue.job_failed(repo, job, DateTime.utc_now(), timeout)
:ok
end

try do
queue.perform(JobQueue.initial_multi(job), job.params)
case queue.perform(JobQueue.initial_multi(job), job.params) do
{:ok, _value} -> :ok
{:error, _value} -> job_failed.()
{:error, _failed_operation, _failed_value, _changes_so_far} -> job_failed.()
end
rescue
e ->
# An exception occurred, make an attempt to put the job into the RETRY state
# before propagating the exception
stacktrace = System.stacktrace()

_ = JobQueue.update_job_to_retrying(repo, job, DateTime.utc_now(), timeout)

job_failed.()
reraise(e, stacktrace)
end
end

@spec valid?(any()) :: boolean()
defp valid?(:error), do: false

defp valid?(response) when is_tuple(response), do: :error != elem(response, 0)

defp valid?(_), do: true

@spec log_duration(Config.t, EctoJob.JobQueue.job(), DateTime.t()) :: :ok
defp log_duration(%Config{log: true, log_level: log_level}, _job = %queue{id: id}, start = %DateTime{}) do
@spec log_duration(Config.t(), EctoJob.JobQueue.job(), DateTime.t()) :: :ok
defp log_duration(
%Config{log: true, log_level: log_level},
_job = %queue{id: id},
start = %DateTime{}
) do
duration = DateTime.diff(DateTime.utc_now(), start, :microsecond)
Logger.log(log_level, fn -> "#{queue}[#{id}] done: #{duration} µs" end)
end

defp log_duration(_config, _job, _start), do: :ok

@spec notify_completed(repo, EctoJob.JobQueue.job()) :: :ok
Expand Down
Loading

0 comments on commit 9aa3aa1

Please sign in to comment.