From 6a75566cad54805ff05a50a8a4879aede54a7967 Mon Sep 17 00:00:00 2001 From: Karol Bucek Date: Mon, 14 Mar 2022 11:49:25 +0100 Subject: [PATCH] Fix: compatibility with all (>= 3.0) rufus-scheduler versions (#97) --- .travis.yml | 7 ++- CHANGELOG.md | 3 + Gemfile | 2 + lib/logstash/filters/jdbc/loader_schedule.rb | 56 ++++--------------- .../filters/jdbc/repeating_load_runner.rb | 2 - .../filters/jdbc/single_load_runner.rb | 4 -- lib/logstash/filters/jdbc_static.rb | 15 +++-- lib/logstash/inputs/jdbc.rb | 15 +---- lib/logstash/plugin_mixins/jdbc/scheduler.rb | 28 ++++++++++ logstash-integration-jdbc.gemspec | 6 +- .../jdbc/repeating_load_runner_spec.rb | 2 +- 11 files changed, 64 insertions(+), 76 deletions(-) diff --git a/.travis.yml b/.travis.yml index a50fc73..57da51d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,7 @@ import: -- logstash-plugins/.ci:travis/travis.yml@1.x \ No newline at end of file + - logstash-plugins/.ci:travis/travis.yml@1.x + +env: + jobs: # test with old scheduler version (3.0 was locked in LS 7.x) + - ELASTIC_STACK_VERSION=7.x RUFUS_SCHEDULER_VERSION=3.0.9 + - ELASTIC_STACK_VERSION=6.x RUFUS_SCHEDULER_VERSION=3.0.9 \ No newline at end of file diff --git a/CHANGELOG.md b/CHANGELOG.md index c6b59ce..853ffa4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.2.4 + - Fix: compatibility with all (>= 3.0) rufus-scheduler versions [#97](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/97) + ## 5.2.3 - Performance: avoid contention on scheduler execution [#103](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/103) diff --git a/Gemfile b/Gemfile index 32cc6fb..b1d9bb4 100644 --- a/Gemfile +++ b/Gemfile @@ -9,3 +9,5 @@ if Dir.exist?(logstash_path) && use_logstash_source gem 'logstash-core', :path => "#{logstash_path}/logstash-core" gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api" end + +gem 'rufus-scheduler', ENV['RUFUS_SCHEDULER_VERSION'] if ENV['RUFUS_SCHEDULER_VERSION'] diff --git a/lib/logstash/filters/jdbc/loader_schedule.rb b/lib/logstash/filters/jdbc/loader_schedule.rb index 64344c2..bd7d6b6 100644 --- a/lib/logstash/filters/jdbc/loader_schedule.rb +++ b/lib/logstash/filters/jdbc/loader_schedule.rb @@ -4,60 +4,26 @@ module LogStash module Filters module Jdbc class LoaderSchedule < Validatable - attr_reader :schedule_frequency, :loader_schedule - - def to_log_string - message = "" - message.concat "these months in the year [#{@cronline.months.to_a.join(", ")}];" unless @cronline.months.nil? - message.concat "these days in the month [#{@cronline.days.to_a.join(", ")}];" unless @cronline.days.nil? - message.concat "these hours in the day [#{@cronline.hours.to_a.join(", ")}];" unless @cronline.hours.nil? - message.concat "these minutes in the hour [#{@cronline.minutes.to_a.join(", ")}];" unless @cronline.minutes.nil? - message.concat "these seconds in the minute [#{@cronline.seconds.to_a.join(", ")}]" unless @cronline.seconds.nil? - if !message.empty? - message.prepend "Scheduled for: " - end - message - end + attr_reader :loader_schedule private - def post_initialize - if valid? - # From the Rufus::Scheduler docs: - # By default, rufus-scheduler sleeps 0.300 second between every step. - # At each step it checks for jobs to trigger and so on. - # set the frequency to 2.5 seconds if we are not reloading in the seconds timeframe - # rufus scheduler thread should respond to stop quickly enough. - if only_seconds_set? - @schedule_frequency = 0.3 - else - @schedule_frequency = 2.5 - end - end - end - - - def only_seconds_set? - @cronline.seconds && - @cronline.minutes.nil? && - @cronline.hours.nil? && - @cronline.days.nil? && - @cronline.months.nil? - end - + # @overload def parse_options @loader_schedule = @options - unless @loader_schedule.is_a?(String) + if @loader_schedule.is_a?(String) + begin + # Rufus::Scheduler 3.0 - 3.6 methods signature: parse_cron(o, opts) + # since Rufus::Scheduler 3.7 methods signature: parse_cron(o, opts={}) + @cronline = Rufus::Scheduler.parse_cron(@loader_schedule, {}) + rescue => e + @option_errors << "The loader_schedule option is invalid: #{e.message}" + end + else @option_errors << "The loader_schedule option must be a string" end - begin - @cronline = Rufus::Scheduler::CronLine.new(@loader_schedule) - rescue => e - @option_errors << "The loader_schedule option is invalid: #{e.message}" - end - @valid = @option_errors.empty? end end diff --git a/lib/logstash/filters/jdbc/repeating_load_runner.rb b/lib/logstash/filters/jdbc/repeating_load_runner.rb index 05262c3..d7c5fde 100644 --- a/lib/logstash/filters/jdbc/repeating_load_runner.rb +++ b/lib/logstash/filters/jdbc/repeating_load_runner.rb @@ -3,8 +3,6 @@ module LogStash module Filters module Jdbc class RepeatingLoadRunner < SingleLoadRunner - # info - attr_reader :local, :loaders, :preloaders - def repeated_load local.repopulate_all(loaders) @reload_counter.increment diff --git a/lib/logstash/filters/jdbc/single_load_runner.rb b/lib/logstash/filters/jdbc/single_load_runner.rb index 6ee6d46..599a450 100644 --- a/lib/logstash/filters/jdbc/single_load_runner.rb +++ b/lib/logstash/filters/jdbc/single_load_runner.rb @@ -26,10 +26,6 @@ def initial_load def repeated_load end - def call - repeated_load - end - def reload_count @reload_counter.value end diff --git a/lib/logstash/filters/jdbc_static.rb b/lib/logstash/filters/jdbc_static.rb index b080620..ca55ea8 100644 --- a/lib/logstash/filters/jdbc_static.rb +++ b/lib/logstash/filters/jdbc_static.rb @@ -3,6 +3,7 @@ require "logstash/filters/base" require "logstash/namespace" require "logstash/plugin_mixins/ecs_compatibility_support" +require "logstash/plugin_mixins/jdbc/scheduler" require_relative "jdbc/loader" require_relative "jdbc/loader_schedule" require_relative "jdbc/repeating_load_runner" @@ -191,17 +192,15 @@ def prepare_runner @processor = Jdbc::LookupProcessor.new(@local_lookups, global_lookup_options) runner_args.unshift(@processor.local) if @loader_schedule - args = [] @loader_runner = Jdbc::RepeatingLoadRunner.new(*runner_args) @loader_runner.initial_load - cronline = Jdbc::LoaderSchedule.new(@loader_schedule) - cronline.to_log_string.tap do |msg| - logger.info("Scheduler operations: #{msg}") unless msg.empty? + @scheduler = LogStash::PluginMixins::Jdbc::Scheduler. + start_cron_scheduler(@loader_schedule, thread_name: "[#{id}]-jdbc_static__scheduler") { @loader_runner.repeated_load } + cron_job = @scheduler.cron_jobs.first + if cron_job + frequency = cron_job.respond_to?(:rough_frequency) ? cron_job.rough_frequency : cron_job.frequency + logger.info("Loaders will execute every #{frequency} seconds", loader_schedule: @loader_schedule) end - logger.info("Scheduler scan for work frequency is: #{cronline.schedule_frequency}") - rufus_args = {:max_work_threads => 1, :frequency => cronline.schedule_frequency} - @scheduler = Rufus::Scheduler.new(rufus_args) - @scheduler.cron(cronline.loader_schedule, @loader_runner) else @loader_runner = Jdbc::SingleLoadRunner.new(*runner_args) @loader_runner.initial_load diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 3ad54af..90b2236 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -295,19 +295,8 @@ def run(queue) load_driver if @schedule # input thread (Java) name example "[my-oracle] 1, - :thread_name => "[#{id}] 1.0, - ) - @scheduler.schedule_cron @schedule do - execute_query(queue) - end - + @scheduler = LogStash::PluginMixins::Jdbc::Scheduler. + start_cron_scheduler(@schedule, thread_name: "[#{id}] 3.0.9' + # plugin maintains compatibility with < 3.5 (3.0.9) + # but works with newer rufus-scheduler >= 3.5 as well + s.add_runtime_dependency 'rufus-scheduler' s.add_runtime_dependency 'logstash-mixin-ecs_compatibility_support', '~>1.3' s.add_runtime_dependency "logstash-mixin-validator_support", '~> 1.0' s.add_runtime_dependency "logstash-mixin-event_support", '~> 1.0' diff --git a/spec/filters/jdbc/repeating_load_runner_spec.rb b/spec/filters/jdbc/repeating_load_runner_spec.rb index eb7b7c2..b537687 100644 --- a/spec/filters/jdbc/repeating_load_runner_spec.rb +++ b/spec/filters/jdbc/repeating_load_runner_spec.rb @@ -17,7 +17,7 @@ module LogStash module Filters module Jdbc expect(local_db).to receive(:populate_all).once.with(loaders) expect(local_db).to receive(:repopulate_all).once.with(loaders) runner.initial_load - subject.call + subject.repeated_load end end end