Skip to content

Commit

Permalink
Fix cron double-enqueue because delay close to 0.01 and possibly cloc…
Browse files Browse the repository at this point in the history
…k-drift (#1543)

* fix(good_job-1536): case of clock drift and delay close to 0.001

* Refactor to pass state through create_task instead of via ivar

* Fix yard typo

* When should have run in the past, run immediately

---------

Co-authored-by: Ben Sheldon [he/him] <[email protected]>
  • Loading branch information
ccouton and bensheldon authored Nov 21, 2024
1 parent 4f6d193 commit c102011
Showing 1 changed file with 24 additions and 12 deletions.
36 changes: 24 additions & 12 deletions lib/good_job/cron_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ def self.task_observer(time, output, thread_error) # rubocop:disable Lint/Unused
end

# Execution configuration to be scheduled
# @return [Hash]
# @return [Array<CronEntry>]
attr_reader :cron_entries

# @param cron_entries [Array<CronEntry>]
# @param start_on_initialize [Boolean]
# @param graceful_restart_period [ActiveSupport::Duration, nil]
# @param executor [Concurrent::Executor]
def initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor)
@executor = executor
@running = false
Expand Down Expand Up @@ -82,16 +84,26 @@ def shutdown?

# Enqueues a scheduled task
# @param cron_entry [CronEntry] the CronEntry object to schedule
# @param previously_at [Date, Time, ActiveSupport::TimeWithZone, nil] the last, +in-memory+, scheduled time the cron task was intended to run
def create_task(cron_entry, previously_at: nil)
cron_at = cron_entry.next_at(previously_at: previously_at)
delay = [(cron_at - Time.current).to_f, 0].max
future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at], executor: @executor) do |thr_scheduler, thr_cron_entry, thr_cron_at|
# Re-schedule the next cron task before executing the current task
thr_scheduler.create_task(thr_cron_entry, previously_at: thr_cron_at)

Rails.application.executor.wrap do
cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled?
# @param at [Time, nil] When a task needs to optionally be rescheduled because of clock-drift or other inaccuracy
# @param previously_at [Time, nil] the last +in-memory+ scheduled time the cron task was intended to run
def create_task(cron_entry, at: nil, previously_at: nil)
cron_at = at || cron_entry.next_at(previously_at: previously_at)

# ScheduledTask runs immediately if delay is <= 0.01; avoid ever scheduling the task before the intended time
# https://github.com/ruby-concurrency/concurrent-ruby/blob/56227a4c3ebdd53b8b0976eb8296ceb7a093496f/lib/concurrent-ruby/concurrent/executor/timer_set.rb#L97
delay = cron_at <= Time.current ? 0.0 : [(cron_at - Time.current).to_f, 0.02].max

future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at, previously_at], executor: @executor) do |thr_manager, thr_cron_entry, thr_cron_at|
if thr_cron_at && thr_cron_at > Time.current
# If clock drift or other inaccuracy, reschedule the task again
thr_manager.create_task(thr_cron_entry, at: thr_cron_at, previously_at: previously_at)
else
# Re-schedule the next cron task before executing the current task
thr_manager.create_task(thr_cron_entry, previously_at: thr_cron_at)

Rails.application.executor.wrap do
cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled?
end
end
end

Expand All @@ -108,7 +120,7 @@ def create_graceful_tasks(cron_entry)

time_period = @graceful_restart_period.ago..Time.current
cron_entry.within(time_period).each do |cron_at|
future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_scheduler, thr_cron_entry, thr_cron_at|
future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_manager, thr_cron_entry, thr_cron_at|
Rails.application.executor.wrap do
cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled?
end
Expand Down

0 comments on commit c102011

Please sign in to comment.