From c10201170f9bf87adcb7719f80b0210e9531b1fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9dric=20Couton?= Date: Thu, 21 Nov 2024 16:03:37 +0100 Subject: [PATCH] Fix cron double-enqueue because delay close to 0.01 and possibly clock-drift (#1543) * fix(good_job-1536): case of clock drift and delay close to 0.001 * Refactor to pass state through create_task instead of via ivar * Fix yard typo * When should have run in the past, run immediately --------- Co-authored-by: Ben Sheldon [he/him] --- lib/good_job/cron_manager.rb | 36 ++++++++++++++++++++++++------------ 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/lib/good_job/cron_manager.rb b/lib/good_job/cron_manager.rb index a4973bde..6d72ad63 100644 --- a/lib/good_job/cron_manager.rb +++ b/lib/good_job/cron_manager.rb @@ -26,11 +26,13 @@ def self.task_observer(time, output, thread_error) # rubocop:disable Lint/Unused end # Execution configuration to be scheduled - # @return [Hash] + # @return [Array] attr_reader :cron_entries # @param cron_entries [Array] # @param start_on_initialize [Boolean] + # @param graceful_restart_period [ActiveSupport::Duration, nil] + # @param executor [Concurrent::Executor] def initialize(cron_entries = [], start_on_initialize: false, graceful_restart_period: nil, executor: Concurrent.global_io_executor) @executor = executor @running = false @@ -82,16 +84,26 @@ def shutdown? # Enqueues a scheduled task # @param cron_entry [CronEntry] the CronEntry object to schedule - # @param previously_at [Date, Time, ActiveSupport::TimeWithZone, nil] the last, +in-memory+, scheduled time the cron task was intended to run - def create_task(cron_entry, previously_at: nil) - cron_at = cron_entry.next_at(previously_at: previously_at) - delay = [(cron_at - Time.current).to_f, 0].max - future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at], executor: @executor) do |thr_scheduler, thr_cron_entry, thr_cron_at| - # Re-schedule the next cron task before executing the current task - thr_scheduler.create_task(thr_cron_entry, previously_at: thr_cron_at) - - Rails.application.executor.wrap do - cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? + # @param at [Time, nil] When a task needs to optionally be rescheduled because of clock-drift or other inaccuracy + # @param previously_at [Time, nil] the last +in-memory+ scheduled time the cron task was intended to run + def create_task(cron_entry, at: nil, previously_at: nil) + cron_at = at || cron_entry.next_at(previously_at: previously_at) + + # ScheduledTask runs immediately if delay is <= 0.01; avoid ever scheduling the task before the intended time + # https://github.com/ruby-concurrency/concurrent-ruby/blob/56227a4c3ebdd53b8b0976eb8296ceb7a093496f/lib/concurrent-ruby/concurrent/executor/timer_set.rb#L97 + delay = cron_at <= Time.current ? 0.0 : [(cron_at - Time.current).to_f, 0.02].max + + future = Concurrent::ScheduledTask.new(delay, args: [self, cron_entry, cron_at, previously_at], executor: @executor) do |thr_manager, thr_cron_entry, thr_cron_at| + if thr_cron_at && thr_cron_at > Time.current + # If clock drift or other inaccuracy, reschedule the task again + thr_manager.create_task(thr_cron_entry, at: thr_cron_at, previously_at: previously_at) + else + # Re-schedule the next cron task before executing the current task + thr_manager.create_task(thr_cron_entry, previously_at: thr_cron_at) + + Rails.application.executor.wrap do + cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? + end end end @@ -108,7 +120,7 @@ def create_graceful_tasks(cron_entry) time_period = @graceful_restart_period.ago..Time.current cron_entry.within(time_period).each do |cron_at| - future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_scheduler, thr_cron_entry, thr_cron_at| + future = Concurrent::Future.new(args: [self, cron_entry, cron_at], executor: @executor) do |_thr_manager, thr_cron_entry, thr_cron_at| Rails.application.executor.wrap do cron_entry.enqueue(thr_cron_at) if thr_cron_entry.enabled? end