Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP/PROPOSAL - Redis publishing format #56

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions active_publisher.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,5 @@ Gem::Specification.new do |spec|
spec.add_development_dependency "pry"
spec.add_development_dependency "rake"
spec.add_development_dependency "rspec", "~> 3.2"
spec.add_development_dependency "rspec-benchmark"
end
2 changes: 1 addition & 1 deletion lib/active_publisher/async/redis_adapter.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def initialize(new_redis_pool)

def publish(route, payload, exchange_name, options = {})
message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
queue << ::Marshal.dump(message)
queue << message.to_json
flush_queue! if queue.size >= flush_min || options[:flush_queue]

nil
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
require "json"

module ActivePublisher
module Async
module RedisAdapter
Expand All @@ -10,7 +12,7 @@ def initialize(redis_connection_pool, new_list_key)
end

def <<(message)
encoded_message = ::Marshal.dump(message)
encoded_message = message.to_json

redis_pool.with do |redis|
redis.rpush(list_key, encoded_message)
Expand All @@ -24,7 +26,7 @@ def concat(*messages)

encoded_messages = []
messages.each do |message|
encoded_messages << ::Marshal.dump(message)
encoded_messages << message.to_json
end

redis_pool.with do |redis|
Expand Down Expand Up @@ -92,10 +94,12 @@ def shift(number)
messages = [messages] unless messages.respond_to?(:each)

shifted_messages = []

messages.each do |message|
next if message.nil?

shifted_messages << ::Marshal.load(message)
# TODO: This should probably attempt to ::Marshal.load and fall back to JSON for
# apps cutting over to new serialization.
shifted_messages << ::ActivePublisher::Message.from_json(message)
end

shifted_messages
Expand Down
19 changes: 18 additions & 1 deletion lib/active_publisher/message.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
require "json"
module ActivePublisher
class Message < Struct.new(:route, :payload, :exchange_name, :options); end
class Message < Struct.new(:route, :payload, :exchange_name, :options)
class << self
def from_json(payload)
parsed = JSON.load(payload)
self.new(
parsed["route"],
parsed["payload"],
parsed["exchange_name"],
parsed["options"],
)
end
end

def to_json
self.to_h.to_json
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will probably want to wrap "payload" in base64 encoding to avoid parsing random bytes as a string in json (won't work).

end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
describe ::ActivePublisher::Async::RedisAdapter::Adapter do
subject { described_class.new(redis_pool) }
let(:route) { "test" }
let(:payload) { "payload" }
let(:exchange_name) { "place" }
let(:options) { { :flush_queue => true, :test => :ok } }
let(:message) { ::ActivePublisher::Message.new(route, payload, exchange_name, options) }
let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } }

describe "#publish.benchmark" do
before do
allow(::ActivePublisher::Message).to receive(:new).with(route, payload, exchange_name, options).and_return(message)
end

it "can serialize messages to publish into redis in under 3ms" do
expect {
expect_any_instance_of(::Redis).to receive(:rpush)
subject.publish(route, payload, exchange_name, options)
}.to perform_under(3).ms.sample(1_000).times
end
end

describe "#shutdown!" do
# This is called when the rspec finishes. I'm sure we can make this a better test.
end
end
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
describe ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue do
let(:list_key) { ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY }
let(:redis_pool) { ::ConnectionPool.new(:size => 5) { ::Redis.new } }
let(:message) { ::ActivePublisher::Message.new('rtg.key', 'payload', 'some.exchange', {})}
let(:ten_messages) { 10.times.map { message } }
subject { described_class.new(redis_pool, list_key) }

describe "initialize with a redis_pool and list_key" do
Expand All @@ -13,29 +15,18 @@

describe "#<<" do
it "pushes 1 item on the list" do
subject << "derp"
subject << message
expect(subject.size).to be 1
expect(subject.pop_up_to(100)).to eq(["derp"])
expect(subject.pop_up_to(100)).to eq([message])
end

it "pushes 10 items on the list" do
10.times do
subject << "derp"
subject << message
end

expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end
end

Expand All @@ -45,86 +36,40 @@
end

it "pushes 1 item on the list" do
subject.concat("derp")
subject.concat(message)
expect(subject.size).to be 1
expect(subject.pop_up_to(100)).to eq(["derp"])
expect(subject.pop_up_to(100)).to eq([message])
end

it "pushes 10 items on the list" do
10.times do
subject.concat("derp")
subject.concat(message)
end

expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end

it "pushes 10 items on the list in single concat" do
subject.concat("derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp")
subject.concat(message,
message,
message,
message,
message,
message,
message,
message,
message,
message)

expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end

it "pushes 10 items on the list in single concat (with array)" do
array = [
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp"
]

subject.concat(array)
subject.concat(ten_messages)
expect(subject.size).to be 10
expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end
end

Expand All @@ -135,7 +80,7 @@

it "is false when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, "derp")
redis.rpush(list_key, message.to_json)
end

expect(subject.empty?).to be false
Expand All @@ -144,7 +89,7 @@
it "is false when ten items are inserted to the list_key List" do
redis_pool.with do |redis|
10.times do
redis.rpush(list_key, "derp")
redis.rpush(list_key, message.to_json)
end
end

Expand All @@ -159,31 +104,20 @@

it "returns 1 item when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end

expect(subject.pop_up_to(100)).to eq(["derp"])
expect(subject.pop_up_to(100)).to eq([message])
end

it "is 10 when ten items are inserted to the list_key List" do
redis_pool.with do |redis|
10.times do
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end
end

expect(subject.pop_up_to(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.pop_up_to(100)).to eq(ten_messages)
end
end

Expand All @@ -194,31 +128,20 @@

it "returns 1 item when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end

expect(subject.shift(100)).to eq(["derp"])
expect(subject.shift(100)).to eq([message])
end

it "is 10 when ten items are inserted to the list_key List" do
redis_pool.with do |redis|
10.times do
redis.rpush(list_key, ::Marshal.dump("derp"))
redis.rpush(list_key, message.to_json)
end
end

expect(subject.shift(100)).to eq([
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
"derp",
])
expect(subject.shift(100)).to eq(ten_messages)
end
end

Expand All @@ -229,7 +152,7 @@

it "is 1 when a single item is inserted to the list_key List" do
redis_pool.with do |redis|
redis.rpush(list_key, "derp")
redis.rpush(list_key, message.to_json)
end

expect(subject.size).to be 1
Expand Down
5 changes: 5 additions & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,17 @@
require "fakeredis/rspec"
require "active_publisher/async/redis_adapter"
require "connection_pool"
require "rspec-benchmark"

::ActivePublisher::Async.publisher_adapter = ::ActivePublisher::Async::InMemoryAdapter::Adapter.new
# Silence the logger
$TESTING = true
::ActivePublisher::Logging.initialize_logger(nil)

RSpec.configure do |config|
config.include RSpec::Benchmark::Matchers
end

def verify_expectation_within(number_of_seconds, check_every = 0.02)
waiting_since = ::Time.now
begin
Expand Down