Skip to content

Commit

Permalink
Merge pull request #427 from Shopify/use-rails-logger
Browse files Browse the repository at this point in the history
Use the Rails application logger
  • Loading branch information
etiennebarrie authored Sep 5, 2023
2 parents bd21ef6 + a758bc3 commit 0bfc557
Show file tree
Hide file tree
Showing 16 changed files with 125 additions and 157 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
### Main (unreleased)

- [367](https://github.com/Shopify/job-iteration/pull/367) - Iteration can use multiple Active Job backends simultaneously by inferring the interruption adapter from the job's `queue_adapter_name`. Iteration will now raise `JobIteration::Integrations::LoadError` if no interruption adapter is found for the job's queue adapter, instead of never interrupting the job. `JobIteration.interruption_adapter` and `.load_integrations` have been removed. `JobIteration::Integrations.register` has been added.
## v1.4.1 (Sep 5, 2023)

### Bug fixes

- [427](https://github.com/Shopify/job-iteration/pull/427) - Use the Rails application logger. Changes from [338](https://github.com/Shopify/job-iteration/pull/338) resulted in logging to the original value of ActiveJob.logger, not the one configured by the Rails application.

## v1.4.0 (Aug 23, 2023)

Expand Down
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ GIT
PATH
remote: .
specs:
job-iteration (1.4.0)
job-iteration (1.4.1)
activejob (>= 5.2)

GEM
Expand Down
3 changes: 1 addition & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ end
```

Iteration hooks into Sidekiq and Resque out of the box to support graceful interruption. No extra configuration is required.
Adapters for other Active Job backends can be registered with `JobIteration::Integrations.register("my_queue_adapter_name", object)`, where `object` must implement the `call` method returning `true` if the job must be interrupted and `false` otherwise.

## Guides

Expand Down Expand Up @@ -184,7 +183,7 @@ There a few configuration assumptions that are required for Iteration to work wi

**Why can't I just iterate in `#perform` method and do whatever I want?** You can, but then your job has to comply with a long list of requirements, such as the ones above. This creates leaky abstractions more easily, when instead we can expose a more powerful abstraction for developers--without exposing the underlying infrastructure.

**What happens when my job is interrupted?** A checkpoint will be persisted after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.
**What happens when my job is interrupted?** A checkpoint will be persisted to Redis after the current `each_iteration`, and the job will be re-enqueued. Once it's popped off the queue, the worker will work off from the next iteration.

**What happens with retries?** An interruption of a job does not count as a retry. The iteration of job that caused the job to fail will be retried and progress will continue from there on.

Expand Down
43 changes: 40 additions & 3 deletions lib/job-iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@

require "active_job"
require_relative "./job-iteration/version"
require_relative "./job-iteration/integrations"
require_relative "./job-iteration/enumerator_builder"
require_relative "./job-iteration/iteration"
require_relative "./job-iteration/log_subscriber"

module JobIteration
IntegrationLoadError = Class.new(StandardError)

INTEGRATIONS = [:resque, :sidekiq]

extend self

attr_accessor :logger
attr_writer :logger

self.logger = ActiveJob::Base.logger
class << self
def logger
@logger || ActiveJob::Base.logger
end
end

# Use this to _always_ interrupt the job after it's been running for more than N seconds.
# @example
Expand Down Expand Up @@ -42,6 +49,11 @@ module JobIteration
# where the throttle backoff value will take precedence over this setting.
attr_accessor :default_retry_backoff

# Used internally for hooking into job processing frameworks like Sidekiq and Resque.
attr_accessor :interruption_adapter

self.interruption_adapter = -> { false }

# Set if you want to use your own enumerator builder instead of default EnumeratorBuilder.
# @example
#
Expand All @@ -53,4 +65,29 @@ module JobIteration
attr_accessor :enumerator_builder

self.enumerator_builder = JobIteration::EnumeratorBuilder

def load_integrations
loaded = nil
INTEGRATIONS.each do |integration|
load_integration(integration)
if loaded
raise IntegrationLoadError,
"#{loaded} integration has already been loaded, but #{integration} is also available. " \
"Iteration will only work with one integration."
end
loaded = integration
rescue LoadError
end
end

def load_integration(integration)
unless INTEGRATIONS.include?(integration)
raise IntegrationLoadError,
"#{integration} integration is not supported. Available integrations: #{INTEGRATIONS.join(", ")}"
end

require_relative "./job-iteration/integrations/#{integration}"
end
end

JobIteration.load_integrations unless ENV["ITERATION_DISABLE_AUTOCONFIGURE"]
8 changes: 1 addition & 7 deletions lib/job-iteration/active_record_enumerator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def records
def batches
cursor = finder_cursor
Enumerator.new(method(:size)) do |yielder|
while (records = instrument_next_batch(cursor))
while (records = cursor.next_batch(@batch_size))
yielder.yield(records, cursor_value(records.last)) if records.any?
end
end
Expand All @@ -43,12 +43,6 @@ def size

private

def instrument_next_batch(cursor)
ActiveSupport::Notifications.instrument("active_record_cursor.iteration") do
cursor.next_batch(@batch_size)
end
end

def cursor_value(record)
positions = @columns.map do |column|
attribute_name = column.to_s.split(".").last
Expand Down
37 changes: 0 additions & 37 deletions lib/job-iteration/integrations.rb

This file was deleted.

30 changes: 12 additions & 18 deletions lib/job-iteration/integrations/resque.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,21 @@

module JobIteration
module Integrations
module Resque
module IterationExtension
def initialize(*)
$resque_worker = self
super
end
end

# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(IterationExtension)
end
module ResqueIterationExtension # @private
def initialize(*) # @private
$resque_worker = self
super
end
end

class << self
def call
$resque_worker.try!(:shutdown?)
end
# @private
module ::Resque
class Worker
# The patch is required in order to call shutdown? on a Resque::Worker instance
prepend(ResqueIterationExtension)
end
end

JobIteration.interruption_adapter = -> { $resque_worker.try!(:shutdown?) }
end
end
18 changes: 8 additions & 10 deletions lib/job-iteration/integrations/sidekiq.rb
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
# frozen_string_literal: true

require "sidekiq"

module JobIteration
module Integrations
module Sidekiq
class << self
def call
if defined?(::Sidekiq::CLI) && (instance = ::Sidekiq::CLI.instance)
instance.launcher.stopping?
else
false
end
end
module Integrations # @private
JobIteration.interruption_adapter = -> do
if defined?(Sidekiq::CLI) && Sidekiq::CLI.instance
Sidekiq::CLI.instance.launcher.stopping?
else
false
end
end
end
Expand Down
6 changes: 1 addition & 5 deletions lib/job-iteration/iteration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ def retry_job(*, **)

private

def interruption_adapter
@interruption_adapter ||= JobIteration::Integrations.load(self.class.queue_adapter_name)
end

def enumerator_builder
JobIteration.enumerator_builder.new(self)
end
Expand Down Expand Up @@ -299,7 +295,7 @@ def job_should_exit?
return true
end

interruption_adapter.call || (defined?(super) && super)
JobIteration.interruption_adapter.call || (defined?(super) && super)
end

def handle_completed(completed)
Expand Down
4 changes: 2 additions & 2 deletions lib/job-iteration/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def call
# MyJob.perform_now
# end
def iterate_exact_times(n_times)
JobIteration::Integrations.stubs(:load).returns(StoppingSupervisor.new(n_times.size))
JobIteration.stubs(:interruption_adapter).returns(StoppingSupervisor.new(n_times.size))
end

# Stubs interruption adapter to interrupt the job after every sing iteration.
Expand All @@ -47,7 +47,7 @@ def mark_job_worker_as_interrupted
def stub_shutdown_adapter_to_return(value)
adapter = mock
adapter.stubs(:call).returns(value)
JobIteration::Integrations.stubs(:load).returns(adapter)
JobIteration.stubs(:interruption_adapter).returns(adapter)
end
end
end
2 changes: 1 addition & 1 deletion lib/job-iteration/version.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# frozen_string_literal: true

module JobIteration
VERSION = "1.4.0"
VERSION = "1.4.1"
end
77 changes: 40 additions & 37 deletions test/integration/integrations_test.rb
Original file line number Diff line number Diff line change
@@ -1,52 +1,55 @@
# frozen_string_literal: true

require "test_helper"

class IntegrationsTest < IterationUnitTest
class IterationJob < ActiveJob::Base
include JobIteration::Iteration

def build_enumerator(cursor:)
enumerator_builder.build_once_enumerator(cursor: cursor)
end

def each_iteration(*)
require "open3"

class IntegrationsTest < ActiveSupport::TestCase
test "will prevent loading two integrations" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
require 'job-iteration'
RUBBY
_stdout, stderr, status = run_ruby(rubby)

assert_equal false, status.success?
assert_match(/resque integration has already been loaded, but sidekiq is also available/, stderr)
end
end

class ResqueJob < IterationJob
self.queue_adapter = :resque
end

class SidekiqJob < IterationJob
self.queue_adapter = :sidekiq
end

test "loads multiple integrations" do
resque_job = ResqueJob.new.serialize
ActiveJob::Base.execute(resque_job)

sidekiq_job = SidekiqJob.new.serialize
ActiveJob::Base.execute(sidekiq_job)
test "successfully loads one (resque) integration" do
with_env("ITERATION_DISABLE_AUTOCONFIGURE", nil) do
rubby = <<~RUBBY
require 'bundler/setup'
# Remove sidekiq, only resque will be left
$LOAD_PATH.delete_if { |p| p =~ /sidekiq/ }
require 'job-iteration'
RUBBY
_stdout, _stderr, status = run_ruby(rubby)

assert_equal true, status.success?
end
end

test ".register accepts an object does implementing #call" do
JobIteration::Integrations.register(:registration_test, -> { true })
private

assert(JobIteration::Integrations.registered_integrations[:registration_test].call)
end
def run_ruby(body)
stdout, stderr, status = nil
Tempfile.open do |f|
f.write(body)
f.close

test ".register raises when the callable object does not implement #call" do
error = assert_raises(ArgumentError) do
JobIteration::Integrations.register("foo", "bar")
command = "ruby #{f.path}"
stdout, stderr, status = Open3.capture3(command)
end
assert_equal("Interruption adapter must respond to #call", error.message)
[stdout, stderr, status]
end

test "raises for unknown Active Job queue adapter names" do
error = assert_raises(JobIteration::Integrations::LoadError) do
JobIteration::Integrations.load("unknown")
end
assert_equal("Could not find integration for 'unknown'", error.message)
def with_env(variable, value)
original = ENV[variable]
ENV[variable] = value
yield
ensure
ENV[variable] = original
end
end
1 change: 1 addition & 0 deletions test/support/sidekiq/init.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "job-iteration"
require "job-iteration/integrations/sidekiq"

require "active_job"
require "i18n"
Expand Down
3 changes: 2 additions & 1 deletion test/test_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
$LOAD_PATH.unshift(File.expand_path("../../lib", __FILE__))
require "minitest/autorun"

ENV["ITERATION_DISABLE_AUTOCONFIGURE"] = "true"

require "job-iteration"
require "job-iteration/test_helper"

Expand Down Expand Up @@ -38,7 +40,6 @@ def enqueue_at(job, timestamp)
end

ActiveJob::Base.queue_adapter = :iteration_test
JobIteration::Integrations.register("iteration_test", -> { false })

class Product < ActiveRecord::Base
has_many :comments
Expand Down
Loading

0 comments on commit 0bfc557

Please sign in to comment.