Skip to content

Commit

Permalink
Merge pull request #367 from bdewater/interruption-adapter-from-job
Browse files Browse the repository at this point in the history
Infer interruption handler from a job's queue adapter
  • Loading branch information
Mangara authored Aug 26, 2023
2 parents 7e6c385 + e07ddc0 commit 9ca5b6f
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 112 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### Main (unreleased)

Nil
- [367](https://github.com/Shopify/job-iteration/pull/367) - Iteration can use multiple Active Job backends simultaneously by inferring the interruption adapter from the job's `queue_adapter_name`. `JobIteration.interruption_adapter` and `.load_integrations` have been removed. `JobIteration::Integrations.register` has been added.

## v1.4.0 (Aug 23, 2023)

Expand Down
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ end
```

Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required.
Adapters for other Active Job backends can be registered with `JobIteration::Integrations.register("my_queue_adapter_name", object)`, where `object` must implement the `call` method returning `true` if the job must be interrupted and `false` otherwise.

## Guides

Expand Down Expand Up @@ -183,7 +184,7 @@ There a few configuration assumptions that are required for Iteration to work wi

**Why can't I just iterate in `#perform` method and do whatever I want?** You can, but then your job has to comply with a long list of requirements, such as the ones above. This creates leaky abstractions more easily, when instead we can expose a more powerful abstraction for developers--without exposing the underlying infrastructure.

**What happens when my job is interrupted?** A checkpoint will be persisted to Redis after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.
**What happens when my job is interrupted?** A checkpoint will be persisted after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.

**What happens with retries?** An interruption of a job does not count as a retry. The iteration of job that caused the job to fail will be retried and progress will continue from there on.

Expand Down
35 changes: 1 addition & 34 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,12 @@

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/integrations"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

extend self

attr_accessor :logger
Expand Down Expand Up @@ -45,11 +42,6 @@ module JobIteration
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

self.interruption_adapter = -> { false }

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
Expand All @@ -61,29 +53,4 @@ module JobIteration
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
37 changes: 37 additions & 0 deletions lib/job-iteration/integrations.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# frozen_string_literal: true

module JobIteration
# @api private
module Integrations
LoadError = Class.new(StandardError)

extend self

attr_accessor :registered_integrations

self.registered_integrations = {}

autoload :Sidekiq, "job-iteration/integrations/sidekiq"
autoload :Resque, "job-iteration/integrations/resque"

# @api public
def register(name, callable)
raise ArgumentError, "Interruption adapter must respond to #call" unless callable.respond_to?(:call)

registered_integrations[name] = callable
end

def load(name)
if (callable = registered_integrations[name])
callable
else
begin
klass = "#{self}::#{name.camelize}".constantize
register(name, klass)
rescue NameError
raise LoadError, "Could not find integration for '#{name}'"
end
end
end
end
end
30 changes: 18 additions & 12 deletions lib/job-iteration/integrations/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,27 @@

module JobIteration
module Integrations
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
module Resque
module IterationExtension
def initialize(*)
$resque_worker = self
super
end
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(ResqueIterationExtension)
# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(IterationExtension)
end
end
end

JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) }
class << self
def call
$resque_worker.try!(:shutdown?)
end
end
end
end
end
18 changes: 10 additions & 8 deletions lib/job-iteration/integrations/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# frozen_string_literal: true

require "sidekiq"

module JobIteration
module Integrations # @private
JobIteration.interruption_adapter = -> do
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
module Integrations
module Sidekiq
class << self
def call
if defined?(::Sidekiq::CLI) && (instance = ::Sidekiq::CLI.instance)
instance.launcher.stopping?
else
false
end
end
end
end
end
Expand Down
6 changes: 5 additions & 1 deletion lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ def retry_job(*, **)

private

def interruption_adapter
@interruption_adapter ||= JobIteration::Integrations.load(self.class.queue_adapter_name)
end

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -295,7 +299,7 @@ def job_should_exit?
return true
end

JobIteration.interruption_adapter.call || (defined?(super) && super)
interruption_adapter.call || (defined?(super) && super)
end

def handle_completed(completed)
Expand Down
4 changes: 2 additions & 2 deletions lib/job-iteration/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
JobIteration::Integrations.stubs(:load).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -47,7 +47,7 @@ def mark_job_worker_as_interrupted
def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration.stubs(:interruption_adapter).returns(adapter)
JobIteration::Integrations.stubs(:load).returns(adapter)
end
end
end
77 changes: 37 additions & 40 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
@@ -1,55 +1,52 @@
# frozen_string_literal: true

require "test_helper"
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)

class IntegrationsTest < IterationUnitTest
class IterationJob < ActiveJob::Base
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.build_once_enumerator(cursor: cursor)
end
end

test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

assert_equal true, status.success?
def each_iteration(*)
end
end

private
class ResqueJob < IterationJob
self.queue_adapter = :resque
end

class SidekiqJob < IterationJob
self.queue_adapter = :sidekiq
end

test "loads multiple integrations" do
resque_job = ResqueJob.new.serialize
ActiveJob::Base.execute(resque_job)

sidekiq_job = SidekiqJob.new.serialize
ActiveJob::Base.execute(sidekiq_job)
end

test ".register accepts an object does implementing #call" do
JobIteration::Integrations.register(:registration_test, -> { true })

def run_ruby(body)
stdout, stderr, status = nil
Tempfile.open do |f|
f.write(body)
f.close
assert(JobIteration::Integrations.registered_integrations[:registration_test].call)
end

command = "ruby #{f.path}"
stdout, stderr, status = Open3.capture3(command)
test ".register raises when the callable object does not implement #call" do
error = assert_raises(ArgumentError) do
JobIteration::Integrations.register("foo", "bar")
end
[stdout, stderr, status]
assert_equal("Interruption adapter must respond to #call", error.message)
end

def with_env(variable, value)
original = ENV[variable]
ENV[variable] = value
yield
ensure
ENV[variable] = original
test "raises for unknown Active Job queue adapter names" do
error = assert_raises(JobIteration::Integrations::LoadError) do
JobIteration::Integrations.load("unknown")
end
assert_equal("Could not find integration for 'unknown'", error.message)
end
end
1 change: 0 additions & 1 deletion test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"

require "active_job"
require "i18n"
Expand Down
3 changes: 1 addition & 2 deletions test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"

Expand Down Expand Up @@ -40,6 +38,7 @@ def enqueue_at(job, timestamp)
end

ActiveJob::Base.queue_adapter = :iteration_test
JobIteration::Integrations.register("iteration_test", -> { false })

class Product < ActiveRecord::Base
has_many :comments
Expand Down
10 changes: 0 additions & 10 deletions test/unit/active_job_iteration_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -724,16 +724,6 @@ def test_on_shutdown_called_before_reenqueue
assert_jobs_in_queue(1)
end

def test_mark_job_worker_as_interrupted
mark_job_worker_as_interrupted

assert_equal(true, JobIteration.interruption_adapter.call)

continue_iterating

assert_equal(false, JobIteration.interruption_adapter.call)
end

def test_reenqueue_self
iterate_exact_times(2.times)

Expand Down

0 comments on commit 9ca5b6f

Please sign in to comment.