From ad022a6ccbdabc536fe39b5c3d231dc5f1498568 Mon Sep 17 00:00:00 2001 From: Evan Tetzlaff Date: Thu, 4 Nov 2021 14:33:59 -0600 Subject: [PATCH 1/3] WIP - Converts messages published into Redis to json format to allow for language agnostic polling --- active_publisher.gemspec | 1 + lib/active_publisher/async/redis_adapter.rb | 2 +- .../redis_adapter/redis_multi_pop_queue.rb | 12 +- lib/active_publisher/message.rb | 19 ++- .../redis_adapter_benchmark_spec.rb | 26 ++++ .../redis_multi_pop_queue_spec.rb | 145 ++++-------------- spec/spec_helper.rb | 5 + 7 files changed, 93 insertions(+), 117 deletions(-) create mode 100644 spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb diff --git a/active_publisher.gemspec b/active_publisher.gemspec index 7654b08..68f78d2 100644 --- a/active_publisher.gemspec +++ b/active_publisher.gemspec @@ -37,4 +37,5 @@ Gem::Specification.new do |spec| spec.add_development_dependency "pry" spec.add_development_dependency "rake" spec.add_development_dependency "rspec", "~> 3.2" + spec.add_development_dependency "rspec-benchmark" end diff --git a/lib/active_publisher/async/redis_adapter.rb b/lib/active_publisher/async/redis_adapter.rb index 74007b2..0f7cf50 100644 --- a/lib/active_publisher/async/redis_adapter.rb +++ b/lib/active_publisher/async/redis_adapter.rb @@ -42,7 +42,7 @@ def initialize(new_redis_pool) def publish(route, payload, exchange_name, options = {}) message = ::ActivePublisher::Message.new(route, payload, exchange_name, options) - queue << ::Marshal.dump(message) + queue << message.to_json flush_queue! if queue.size >= flush_min || options[:flush_queue] nil diff --git a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb index 80ecbf9..fb6d94b 100644 --- a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb +++ b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb @@ -1,3 +1,5 @@ +require "json" + module ActivePublisher module Async module RedisAdapter @@ -10,7 +12,7 @@ def initialize(redis_connection_pool, new_list_key) end def <<(message) - encoded_message = ::Marshal.dump(message) + encoded_message = message.to_json redis_pool.with do |redis| redis.rpush(list_key, encoded_message) @@ -24,7 +26,7 @@ def concat(*messages) encoded_messages = [] messages.each do |message| - encoded_messages << ::Marshal.dump(message) + encoded_messages << message.to_json end redis_pool.with do |redis| @@ -92,10 +94,12 @@ def shift(number) messages = [messages] unless messages.respond_to?(:each) shifted_messages = [] + messages.each do |message| next if message.nil? - - shifted_messages << ::Marshal.load(message) + # TODO: This should probably attempt to ::Marshal.load and fall back to JSON for + # apps cutting over to new serialization. + shifted_messages << ::ActivePublisher::Message.from_json(message) end shifted_messages diff --git a/lib/active_publisher/message.rb b/lib/active_publisher/message.rb index 6136684..813ff29 100644 --- a/lib/active_publisher/message.rb +++ b/lib/active_publisher/message.rb @@ -1,3 +1,20 @@ +require "json" module ActivePublisher - class Message < Struct.new(:route, :payload, :exchange_name, :options); end + class Message < Struct.new(:route, :payload, :exchange_name, :options) + class << self + def from_json(payload) + parsed = JSON.load(payload) + self.new( + parsed["route"], + parsed["payload"], + parsed["exchange_name"], + parsed["options"], + ) + end + end + + def to_json + self.to_h.to_json + end + end end diff --git a/spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb b/spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb new file mode 100644 index 0000000..dee8364 --- /dev/null +++ b/spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb @@ -0,0 +1,26 @@ +describe ::ActivePublisher::Async::RedisAdapter::Adapter do + subject { described_class.new(redis_pool) } + let(:route) { "test" } + let(:payload) { "payload" } + let(:exchange_name) { "place" } + let(:options) { { :flush_queue => true, :test => :ok } } + let(:message) { ::ActivePublisher::Message.new(route, payload, exchange_name, options) } + let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } } + + describe "#publish.benchmark" do + before do + allow(::ActivePublisher::Message).to receive(:new).with(route, payload, exchange_name, options).and_return(message) + end + + it "can serialize messages to publish into redis in under 3ms" do + expect { + expect_any_instance_of(::Redis).to receive(:rpush) + subject.publish(route, payload, exchange_name, options) + }.to perform_under(3).ms.sample(1_000).times + end + end + + describe "#shutdown!" do + # This is called when the rspec finishes. I'm sure we can make this a better test. + end +end diff --git a/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb b/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb index 67257eb..61168b4 100644 --- a/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb +++ b/spec/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue_spec.rb @@ -1,6 +1,8 @@ describe ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue do let(:list_key) { ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY } let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } } + let(:message) { ::ActivePublisher::Message.new('rtg.key', 'payload', 'some.exchange', {})} + let(:ten_messages) { 10.times.map { message } } subject { described_class.new(redis_pool, list_key) } describe "initialize with a redis_pool and list_key" do @@ -13,29 +15,18 @@ describe "#<<" do it "pushes 1 item on the list" do - subject << "derp" + subject << message expect(subject.size).to be 1 - expect(subject.pop_up_to(100)).to eq(["derp"]) + expect(subject.pop_up_to(100)).to eq([message]) end it "pushes 10 items on the list" do 10.times do - subject << "derp" + subject << message end expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end end @@ -45,86 +36,40 @@ end it "pushes 1 item on the list" do - subject.concat("derp") + subject.concat(message) expect(subject.size).to be 1 - expect(subject.pop_up_to(100)).to eq(["derp"]) + expect(subject.pop_up_to(100)).to eq([message]) end it "pushes 10 items on the list" do 10.times do - subject.concat("derp") + subject.concat(message) end expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end it "pushes 10 items on the list in single concat" do - subject.concat("derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp") + subject.concat(message, + message, + message, + message, + message, + message, + message, + message, + message, + message) expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end it "pushes 10 items on the list in single concat (with array)" do - array = [ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp" - ] - - subject.concat(array) + subject.concat(ten_messages) expect(subject.size).to be 10 - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end end @@ -135,7 +80,7 @@ it "is false when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, "derp") + redis.rpush(list_key, message.to_json) end expect(subject.empty?).to be false @@ -144,7 +89,7 @@ it "is false when ten items are inserted to the list_key List" do redis_pool.with do |redis| 10.times do - redis.rpush(list_key, "derp") + redis.rpush(list_key, message.to_json) end end @@ -159,31 +104,20 @@ it "returns 1 item when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end - expect(subject.pop_up_to(100)).to eq(["derp"]) + expect(subject.pop_up_to(100)).to eq([message]) end it "is 10 when ten items are inserted to the list_key List" do redis_pool.with do |redis| 10.times do - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end end - expect(subject.pop_up_to(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.pop_up_to(100)).to eq(ten_messages) end end @@ -194,31 +128,20 @@ it "returns 1 item when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end - expect(subject.shift(100)).to eq(["derp"]) + expect(subject.shift(100)).to eq([message]) end it "is 10 when ten items are inserted to the list_key List" do redis_pool.with do |redis| 10.times do - redis.rpush(list_key, ::Marshal.dump("derp")) + redis.rpush(list_key, message.to_json) end end - expect(subject.shift(100)).to eq([ - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - "derp", - ]) + expect(subject.shift(100)).to eq(ten_messages) end end @@ -229,7 +152,7 @@ it "is 1 when a single item is inserted to the list_key List" do redis_pool.with do |redis| - redis.rpush(list_key, "derp") + redis.rpush(list_key, message.to_json) end expect(subject.size).to be 1 diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index f6756f1..449f22b 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,12 +4,17 @@ require "fakeredis/rspec" require "active_publisher/async/redis_adapter" require "connection_pool" +require "rspec-benchmark" ::ActivePublisher::Async.publisher_adapter = ::ActivePublisher::Async::InMemoryAdapter::Adapter.new # Silence the logger $TESTING = true ::ActivePublisher::Logging.initialize_logger(nil) +RSpec.configure do |config| + config.include RSpec::Benchmark::Matchers +end + def verify_expectation_within(number_of_seconds, check_every = 0.02) waiting_since = ::Time.now begin From 367a894d912e5221ce28e310fb550e735c7845dd Mon Sep 17 00:00:00 2001 From: Evan Tetzlaff Date: Fri, 5 Nov 2021 10:52:14 -0600 Subject: [PATCH 2/3] base64 encoding for payload and new redis key --- active_publisher.gemspec | 1 - lib/active_publisher/async/redis_adapter.rb | 2 +- .../redis_adapter/redis_multi_pop_queue.rb | 2 -- lib/active_publisher/message.rb | 11 ++++++-- .../redis_adapter_benchmark_spec.rb | 26 ------------------- spec/spec_helper.rb | 5 ---- 6 files changed, 10 insertions(+), 37 deletions(-) delete mode 100644 spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb diff --git a/active_publisher.gemspec b/active_publisher.gemspec index 68f78d2..7654b08 100644 --- a/active_publisher.gemspec +++ b/active_publisher.gemspec @@ -37,5 +37,4 @@ Gem::Specification.new do |spec| spec.add_development_dependency "pry" spec.add_development_dependency "rake" spec.add_development_dependency "rspec", "~> 3.2" - spec.add_development_dependency "rspec-benchmark" end diff --git a/lib/active_publisher/async/redis_adapter.rb b/lib/active_publisher/async/redis_adapter.rb index 0f7cf50..0610d72 100644 --- a/lib/active_publisher/async/redis_adapter.rb +++ b/lib/active_publisher/async/redis_adapter.rb @@ -6,7 +6,7 @@ module ActivePublisher module Async module RedisAdapter - REDIS_LIST_KEY = "ACTIVE_PUBLISHER_LIST".freeze + REDIS_LIST_KEY = "ACTIVE_PUBLISHER_LIST.V2".freeze def self.new(*args) ::ActivePublisher::Async::RedisAdapter::Adapter.new(*args) diff --git a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb index fb6d94b..516bea6 100644 --- a/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb +++ b/lib/active_publisher/async/redis_adapter/redis_multi_pop_queue.rb @@ -97,8 +97,6 @@ def shift(number) messages.each do |message| next if message.nil? - # TODO: This should probably attempt to ::Marshal.load and fall back to JSON for - # apps cutting over to new serialization. shifted_messages << ::ActivePublisher::Message.from_json(message) end diff --git a/lib/active_publisher/message.rb b/lib/active_publisher/message.rb index 813ff29..78908e4 100644 --- a/lib/active_publisher/message.rb +++ b/lib/active_publisher/message.rb @@ -1,4 +1,6 @@ +require "base64" require "json" + module ActivePublisher class Message < Struct.new(:route, :payload, :exchange_name, :options) class << self @@ -6,7 +8,7 @@ def from_json(payload) parsed = JSON.load(payload) self.new( parsed["route"], - parsed["payload"], + Base64.decode64(parsed["payload"]), parsed["exchange_name"], parsed["options"], ) @@ -14,7 +16,12 @@ def from_json(payload) end def to_json - self.to_h.to_json + { + route: self.route, + payload: Base64.encode64(self.payload), + exchange_name: self.exchange_name, + options: self.options, + }.to_json end end end diff --git a/spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb b/spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb deleted file mode 100644 index dee8364..0000000 --- a/spec/lib/active_publisher/async/redis_adapter/redis_adapter_benchmark_spec.rb +++ /dev/null @@ -1,26 +0,0 @@ -describe ::ActivePublisher::Async::RedisAdapter::Adapter do - subject { described_class.new(redis_pool) } - let(:route) { "test" } - let(:payload) { "payload" } - let(:exchange_name) { "place" } - let(:options) { { :flush_queue => true, :test => :ok } } - let(:message) { ::ActivePublisher::Message.new(route, payload, exchange_name, options) } - let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } } - - describe "#publish.benchmark" do - before do - allow(::ActivePublisher::Message).to receive(:new).with(route, payload, exchange_name, options).and_return(message) - end - - it "can serialize messages to publish into redis in under 3ms" do - expect { - expect_any_instance_of(::Redis).to receive(:rpush) - subject.publish(route, payload, exchange_name, options) - }.to perform_under(3).ms.sample(1_000).times - end - end - - describe "#shutdown!" do - # This is called when the rspec finishes. I'm sure we can make this a better test. - end -end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 449f22b..f6756f1 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -4,17 +4,12 @@ require "fakeredis/rspec" require "active_publisher/async/redis_adapter" require "connection_pool" -require "rspec-benchmark" ::ActivePublisher::Async.publisher_adapter = ::ActivePublisher::Async::InMemoryAdapter::Adapter.new # Silence the logger $TESTING = true ::ActivePublisher::Logging.initialize_logger(nil) -RSpec.configure do |config| - config.include RSpec::Benchmark::Matchers -end - def verify_expectation_within(number_of_seconds, check_every = 0.02) waiting_since = ::Time.now begin From 4328256a4a33570af61332780bf7170b15fb4db7 Mon Sep 17 00:00:00 2001 From: Evan Tetzlaff Date: Wed, 10 Nov 2021 12:51:20 -0700 Subject: [PATCH 3/3] march hare doesnt like string keys for options on a message publish --- lib/active_publisher/message.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/active_publisher/message.rb b/lib/active_publisher/message.rb index 78908e4..72f92af 100644 --- a/lib/active_publisher/message.rb +++ b/lib/active_publisher/message.rb @@ -1,5 +1,6 @@ require "base64" require "json" +require "active_support/core_ext/hash/keys" module ActivePublisher class Message < Struct.new(:route, :payload, :exchange_name, :options) @@ -10,7 +11,7 @@ def from_json(payload) parsed["route"], Base64.decode64(parsed["payload"]), parsed["exchange_name"], - parsed["options"], + parsed["options"].symbolize_keys, ) end end