Skip to content

Commit

Permalink
Merge pull request #53 from mxenabled/et/messages-per-batch
Browse files Browse the repository at this point in the history
Adds the abilit to specify batch sizes to add another tunable lever.
  • Loading branch information
ETetzlaff authored Jul 12, 2021
2 parents 069b310 + c90c2ac commit b6a6dd5
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module ActivePublisher
module Async
module InMemoryAdapter
class ConsumerThread
attr_reader :channel, :thread, :queue, :sampled_queue_size, :last_tick_at
attr_reader :channel, :flush_max, :thread, :queue, :sampled_queue_size, :last_tick_at

if ::RUBY_PLATFORM == "java"
CHANNEL_CLOSED_ERRORS = [::MarchHare::ChannelAlreadyClosed]
Expand All @@ -27,6 +27,7 @@ class ConsumerThread
def initialize(listen_queue)
@queue = listen_queue
@sampled_queue_size = queue.size
@flush_max = ::ActivePublisher.configuration.messages_per_batch

update_last_tick_at
start_thread
Expand Down Expand Up @@ -96,7 +97,7 @@ def start_consuming_thread
loop do
# Sample the queue size so we don't shutdown when messages are in flight.
@sampled_queue_size = queue.size
current_messages = queue.pop_up_to(50, :timeout => 0.1)
current_messages = queue.pop_up_to(flush_max, :timeout => 0.1)
update_last_tick_at
# If the queue is empty, we should continue to update to "last_tick_at" time.
next if current_messages.nil?
Expand Down
8 changes: 5 additions & 3 deletions lib/active_publisher/async/redis_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ class Adapter
}
include ::ActivePublisher::Logging

attr_reader :async_queue, :redis_pool, :queue
attr_reader :async_queue, :flush_max, :flush_min, :redis_pool, :queue

def initialize(new_redis_pool)
logger.info "Starting redis publisher adapter"
# do something with supervision ?
@redis_pool = new_redis_pool
@async_queue = ::ActivePublisher::Async::RedisAdapter::Consumer.new(redis_pool)
@queue = ::MultiOpQueue::Queue.new
@flush_max = ::ActivePublisher.configuration.messages_per_batch
@flush_min = @flush_max / 2

supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
queue_size = queue.size
Expand All @@ -41,7 +43,7 @@ def initialize(new_redis_pool)
def publish(route, payload, exchange_name, options = {})
message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
queue << ::Marshal.dump(message)
flush_queue! if queue.size >= 20 || options[:flush_queue]
flush_queue! if queue.size >= flush_min || options[:flush_queue]

nil
end
Expand All @@ -58,7 +60,7 @@ def shutdown!

def flush_queue!
return if queue.empty?
encoded_messages = queue.pop_up_to(25, :timeout => 0.001)
encoded_messages = queue.pop_up_to(flush_max, :timeout => 0.001)

return if encoded_messages.nil?
return unless encoded_messages.respond_to?(:each)
Expand Down
2 changes: 2 additions & 0 deletions lib/active_publisher/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Configuration
:host,
:hosts,
:max_async_publisher_lag_time,
:messages_per_batch,
:network_recovery_interval,
:password,
:port,
Expand Down Expand Up @@ -36,6 +37,7 @@ class Configuration
:host => "localhost",
:hosts => [],
:password => "guest",
:messages_per_batch => 25,
:max_async_publisher_lag_time => 10,
:network_recovery_interval => NETWORK_RECOVERY_INTERVAL,
:port => 5672,
Expand Down
7 changes: 7 additions & 0 deletions spec/lib/active_publisher/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@
expect(::ActivePublisher.configuration.verify_peer).to eq(false)
end

it "can use messages_per_batch" do
expect(::ActivePublisher.configuration.messages_per_batch).to eq(25)
expect(::ActivePublisher.configuration).to receive(:messages_per_batch=).with(50).and_call_original
::ActivePublisher::Configuration.configure_from_yaml_and_cli({"messages_per_batch" => 50}, true)
expect(::ActivePublisher.configuration.messages_per_batch).to eq(50)
end

context "when using a yaml file" do
let!(:sample_yaml_location) { ::File.expand_path(::File.join("spec", "support", "sample_config.yml")) }

Expand Down

0 comments on commit b6a6dd5

Please sign in to comment.