Skip to content

Commit

Permalink
Fix: compatibility with all (>= 3.0) rufus-scheduler versions (#97)
Browse files Browse the repository at this point in the history
  • Loading branch information
kares authored Mar 14, 2022
1 parent 56ace65 commit 6a75566
Show file tree
Hide file tree
Showing 11 changed files with 64 additions and 76 deletions.
7 changes: 6 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
import:
- logstash-plugins/.ci:travis/[email protected]
- logstash-plugins/.ci:travis/[email protected]

env:
jobs: # test with old scheduler version (3.0 was locked in LS 7.x)
- ELASTIC_STACK_VERSION=7.x RUFUS_SCHEDULER_VERSION=3.0.9
- ELASTIC_STACK_VERSION=6.x RUFUS_SCHEDULER_VERSION=3.0.9
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.2.4
- Fix: compatibility with all (>= 3.0) rufus-scheduler versions [#97](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/97)

## 5.2.3
- Performance: avoid contention on scheduler execution [#103](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/103)

Expand Down
2 changes: 2 additions & 0 deletions Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ if Dir.exist?(logstash_path) && use_logstash_source
gem 'logstash-core', :path => "#{logstash_path}/logstash-core"
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api"
end

gem 'rufus-scheduler', ENV['RUFUS_SCHEDULER_VERSION'] if ENV['RUFUS_SCHEDULER_VERSION']
56 changes: 11 additions & 45 deletions lib/logstash/filters/jdbc/loader_schedule.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,60 +4,26 @@

module LogStash module Filters module Jdbc
class LoaderSchedule < Validatable
attr_reader :schedule_frequency, :loader_schedule

def to_log_string
message = ""
message.concat "these months in the year [#{@cronline.months.to_a.join(", ")}];" unless @cronline.months.nil?
message.concat "these days in the month [#{@cronline.days.to_a.join(", ")}];" unless @cronline.days.nil?
message.concat "these hours in the day [#{@cronline.hours.to_a.join(", ")}];" unless @cronline.hours.nil?
message.concat "these minutes in the hour [#{@cronline.minutes.to_a.join(", ")}];" unless @cronline.minutes.nil?
message.concat "these seconds in the minute [#{@cronline.seconds.to_a.join(", ")}]" unless @cronline.seconds.nil?
if !message.empty?
message.prepend "Scheduled for: "
end
message
end
attr_reader :loader_schedule

private

def post_initialize
if valid?
# From the Rufus::Scheduler docs:
# By default, rufus-scheduler sleeps 0.300 second between every step.
# At each step it checks for jobs to trigger and so on.
# set the frequency to 2.5 seconds if we are not reloading in the seconds timeframe
# rufus scheduler thread should respond to stop quickly enough.
if only_seconds_set?
@schedule_frequency = 0.3
else
@schedule_frequency = 2.5
end
end
end


def only_seconds_set?
@cronline.seconds &&
@cronline.minutes.nil? &&
@cronline.hours.nil? &&
@cronline.days.nil? &&
@cronline.months.nil?
end

# @overload
def parse_options
@loader_schedule = @options

unless @loader_schedule.is_a?(String)
if @loader_schedule.is_a?(String)
begin
# Rufus::Scheduler 3.0 - 3.6 methods signature: parse_cron(o, opts)
# since Rufus::Scheduler 3.7 methods signature: parse_cron(o, opts={})
@cronline = Rufus::Scheduler.parse_cron(@loader_schedule, {})
rescue => e
@option_errors << "The loader_schedule option is invalid: #{e.message}"
end
else
@option_errors << "The loader_schedule option must be a string"
end

begin
@cronline = Rufus::Scheduler::CronLine.new(@loader_schedule)
rescue => e
@option_errors << "The loader_schedule option is invalid: #{e.message}"
end

@valid = @option_errors.empty?
end
end
Expand Down
2 changes: 0 additions & 2 deletions lib/logstash/filters/jdbc/repeating_load_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@

module LogStash module Filters module Jdbc
class RepeatingLoadRunner < SingleLoadRunner
# info - attr_reader :local, :loaders, :preloaders

def repeated_load
local.repopulate_all(loaders)
@reload_counter.increment
Expand Down
4 changes: 0 additions & 4 deletions lib/logstash/filters/jdbc/single_load_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@ def initial_load
def repeated_load
end

def call
repeated_load
end

def reload_count
@reload_counter.value
end
Expand Down
15 changes: 7 additions & 8 deletions lib/logstash/filters/jdbc_static.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
require "logstash/filters/base"
require "logstash/namespace"
require "logstash/plugin_mixins/ecs_compatibility_support"
require "logstash/plugin_mixins/jdbc/scheduler"
require_relative "jdbc/loader"
require_relative "jdbc/loader_schedule"
require_relative "jdbc/repeating_load_runner"
Expand Down Expand Up @@ -191,17 +192,15 @@ def prepare_runner
@processor = Jdbc::LookupProcessor.new(@local_lookups, global_lookup_options)
runner_args.unshift(@processor.local)
if @loader_schedule
args = []
@loader_runner = Jdbc::RepeatingLoadRunner.new(*runner_args)
@loader_runner.initial_load
cronline = Jdbc::LoaderSchedule.new(@loader_schedule)
cronline.to_log_string.tap do |msg|
logger.info("Scheduler operations: #{msg}") unless msg.empty?
@scheduler = LogStash::PluginMixins::Jdbc::Scheduler.
start_cron_scheduler(@loader_schedule, thread_name: "[#{id}]-jdbc_static__scheduler") { @loader_runner.repeated_load }
cron_job = @scheduler.cron_jobs.first
if cron_job
frequency = cron_job.respond_to?(:rough_frequency) ? cron_job.rough_frequency : cron_job.frequency
logger.info("Loaders will execute every #{frequency} seconds", loader_schedule: @loader_schedule)
end
logger.info("Scheduler scan for work frequency is: #{cronline.schedule_frequency}")
rufus_args = {:max_work_threads => 1, :frequency => cronline.schedule_frequency}
@scheduler = Rufus::Scheduler.new(rufus_args)
@scheduler.cron(cronline.loader_schedule, @loader_runner)
else
@loader_runner = Jdbc::SingleLoadRunner.new(*runner_args)
@loader_runner.initial_load
Expand Down
15 changes: 2 additions & 13 deletions lib/logstash/inputs/jdbc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -295,19 +295,8 @@ def run(queue)
load_driver
if @schedule
# input thread (Java) name example "[my-oracle]<jdbc"
@scheduler = LogStash::PluginMixins::Jdbc::Scheduler.new(
:max_work_threads => 1,
:thread_name => "[#{id}]<jdbc__scheduler",
# amount the scheduler thread sleeps between checking whether jobs
# should trigger, default is 0.3 which is a bit too often ...
# in theory the cron expression '* * * * * *' supports running jobs
# every second but this is very rare, we could potentially go higher
:frequency => 1.0,
)
@scheduler.schedule_cron @schedule do
execute_query(queue)
end

@scheduler = LogStash::PluginMixins::Jdbc::Scheduler.
start_cron_scheduler(@schedule, thread_name: "[#{id}]<jdbc__scheduler") { execute_query(queue) }
@scheduler.join
else
execute_query(queue)
Expand Down
28 changes: 28 additions & 0 deletions lib/logstash/plugin_mixins/jdbc/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,34 @@ class Scheduler < Rufus::Scheduler
TimeImpl = defined?(Rufus::Scheduler::EoTime) ? Rufus::Scheduler::EoTime :
(defined?(Rufus::Scheduler::ZoTime) ? Rufus::Scheduler::ZoTime : ::Time)

# @param cron [String] cron-line
# @param opts [Hash] scheduler options
# @return scheduler instance
def self.start_cron_scheduler(cron, opts = {}, &block)
unless block_given?
raise ArgumentError, 'missing (cron scheduler) block - worker task to execute'
end
scheduler = new_scheduler(opts)
scheduler.schedule_cron(cron, &block)
scheduler
end

# @param opts [Hash] scheduler options
# @return scheduler instance
def self.new_scheduler(opts)
unless opts.key?(:thread_name)
raise ArgumentError, 'thread_name: option is required to be able to distinguish multiple scheduler threads'
end
opts[:max_work_threads] ||= 1
# amount the scheduler thread sleeps between checking whether jobs
# should trigger, default is 0.3 which is a bit too often ...
# in theory the cron expression '* * * * * *' supports running jobs
# every second but this is very rare, we could potentially go higher
opts[:frequency] ||= 1.0

new(opts)
end

# @overload
def timeout_jobs
# Rufus relies on `Thread.list` which is a blocking operation and with many schedulers
Expand Down
6 changes: 4 additions & 2 deletions logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.2.3'
s.version = '5.2.4'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down Expand Up @@ -34,7 +34,9 @@ Gem::Specification.new do |s|

s.add_runtime_dependency 'tzinfo'
s.add_runtime_dependency 'tzinfo-data'
s.add_runtime_dependency 'rufus-scheduler', '~> 3.0.9'
# plugin maintains compatibility with < 3.5 (3.0.9)
# but works with newer rufus-scheduler >= 3.5 as well
s.add_runtime_dependency 'rufus-scheduler'
s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3'
s.add_runtime_dependency "logstash-mixin-validator_support", '~> 1.0'
s.add_runtime_dependency "logstash-mixin-event_support", '~> 1.0'
Expand Down
2 changes: 1 addition & 1 deletion spec/filters/jdbc/repeating_load_runner_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ module LogStash module Filters module Jdbc
expect(local_db).to receive(:populate_all).once.with(loaders)
expect(local_db).to receive(:repopulate_all).once.with(loaders)
runner.initial_load
subject.call
subject.repeated_load
end
end
end
Expand Down

0 comments on commit 6a75566

Please sign in to comment.