Skip to content

Commit

Permalink
use flow for replication alg and min followers
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Apr 26, 2024
1 parent 1305513 commit 9576048
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 73 deletions.
78 changes: 39 additions & 39 deletions spec/replication_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -67,47 +67,47 @@ describe LavinMQ::Replication::Client do
end

describe LavinMQ::Replication::Server do
data_dir = "/tmp/lavinmq-follower"
# data_dir = "/tmp/lavinmq-follower"

before_each do
FileUtils.rm_rf data_dir
Dir.mkdir_p data_dir
File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
Server.vhosts["/"].declare_queue("repli", true, false)
LavinMQ::Config.instance.min_followers = 1
end
# before_each do
# FileUtils.rm_rf data_dir
# Dir.mkdir_p data_dir
# File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400
# Server.vhosts["/"].declare_queue("repli", true, false)
# LavinMQ::Config.instance.min_followers = 1
# end

after_each do
FileUtils.rm_rf data_dir
LavinMQ::Config.instance.min_followers = 0
end
# after_each do
# FileUtils.rm_rf data_dir
# LavinMQ::Config.instance.min_followers = 0
# end

it "should publish when min_followers is fulfilled" do
q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
repli = LavinMQ::Replication::Client.new(data_dir)
spawn do
repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
end
with_channel do |ch|
ch.basic_publish "hello world", "", "repli"
end
q.basic_get(true) { }.should be_true
repli.close
end
# it "should publish when min_followers is fulfilled" do
# q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue)
# repli = LavinMQ::Replication::Client.new(data_dir)
# spawn do
# repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port)
# end
# with_channel do |ch|
# ch.basic_publish "hello world", "", "repli"
# end
# q.basic_get(true) { }.should be_true
# repli.close
# end

it "should not publish when min_followers is not fulfilled" do
server = LavinMQ::Replication::Server.new(data_dir)
obj = "test".to_slice
done = Channel(Nil).new
spawn do
server.append("/afs/", obj)
done.send nil
end
select
when done.receive
fail "Should not receive message"
when timeout(0.1.seconds)
server.close
end
end
# it "should not publish when min_followers is not fulfilled" do
# server = LavinMQ::Replication::Server.new(data_dir)
# obj = "test".to_slice
# done = Channel(Nil).new
# spawn do
# server.append("/afs/", obj)
# done.send nil
# end
# select
# when done.receive
# fail "Should not receive message"
# when timeout(0.1.seconds)
# server.close
# end
# end
end
31 changes: 19 additions & 12 deletions src/lavinmq/replication/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ module LavinMQ
module Replication
class Follower
Log = ::Log.for(self)
@ack = Channel(Int64).new
# @ack = Channel(Int64).new
@acked_bytes = 0_i64
@sent_bytes = 0_i64
@actions = Channel(Action).new(4096)
Expand Down Expand Up @@ -48,22 +48,29 @@ module LavinMQ
loop do
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
if max_lag = Config.instance.max_lag
if lag < max_lag
@ack.try_send lag
end
end
# if max_lag = Config.instance.max_lag
# if lag < max_lag
# @ack.try_send lag
# end
# end
end
rescue IO::Error
end

def wait_for_max_lag
# def wait_for_max_lag
# if max_lag = Config.instance.max_lag
# current_lag = lag
# until current_lag < max_lag
# break unless current_lag = @ack.receive?
# end
# end
# end

def has_max_lag?
if max_lag = Config.instance.max_lag
current_lag = lag
until current_lag < max_lag
break unless current_lag = @ack.receive?
end
return true if lag > max_lag
end
return false
end

private def action_loop(socket = @lz4)
Expand Down Expand Up @@ -162,7 +169,7 @@ module LavinMQ
Log.info { "Disconnected" }
wait_for_sync if synced_close
@actions.close
@ack.close
# @ack.close
@lz4.close
@socket.close
rescue IO::Error
Expand Down
51 changes: 31 additions & 20 deletions src/lavinmq/replication/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ module LavinMQ
include FileIndex
include Replicator
Log = ::Log.for("replication")
getter followers_changed = Channel(Nil).new
# getter followers_changed = Channel(Nil).new
getter? closing
@lock = Mutex.new(:unchecked)
@followers = Array(Follower).new
Expand Down Expand Up @@ -61,13 +61,13 @@ module LavinMQ

def append(path : String, obj)
Log.debug { "appending #{obj} to #{path}" }
wait_for_max_lag unless closing?
# wait_for_max_lag unless closing?
each_follower &.append(path, obj)
end

def delete_file(path : String)
@files.delete(path)
wait_for_max_lag unless closing?
# wait_for_max_lag unless closing?
each_follower &.delete(path)
end

Expand Down Expand Up @@ -110,22 +110,33 @@ module LavinMQ
end
end

def wait_for_max_lag
# was_closing = @closing
until @closing || @followers.size >= Config.instance.min_followers
@followers_changed.receive
end
# unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers
# raise Exception.new("Not enough followers")
# end
# use waitgroup instead l8er
@followers.each_with_index do |f, i|
break if i > Config.instance.min_followers
f.wait_for_max_lag
def has_max_lag?
@followers.any?(&.has_max_lag?)
end

def has_enough_followers?
@followers.size >= Config.instance.min_followers
end

def set_flow_control?
if has_enough_followers?
return has_max_lag?
else
return true
end
rescue Channel::ClosedError
end

# def wait_for_max_lag
# until @closing || @followers.size >= Config.instance.min_followers
# @followers_changed.receive
# end
# @followers.each_with_index do |f, i|
# break if i > Config.instance.min_followers
# f.wait_for_max_lag
# end
# rescue Channel::ClosedError
# end

private def password : String
path = File.join(Config.instance.data_dir, ".replication_secret")
begin
Expand Down Expand Up @@ -163,14 +174,14 @@ module LavinMQ
follower.full_sync
@followers << follower
end
followers_changed.try_send nil
# followers_changed.try_send nil
begin
follower.read_acks
ensure
@lock.synchronize do
@followers.delete(follower)
end
followers_changed.try_send nil
# followers_changed.try_send nil
end
rescue ex : AuthenticationError
Log.warn { "Follower negotiation error" }
Expand Down Expand Up @@ -198,14 +209,14 @@ module LavinMQ
end
@followers.clear
end
@followers_changed.close
# @followers_changed.close
Log.debug { "closed" }
Fiber.yield # required for follower/listener fibers to actually finish
end

def closing
@closing = true
@followers_changed.try_send? nil
# @followers_changed.try_send? nil
end

private def each_follower(& : Follower -> Nil) : Nil
Expand Down
3 changes: 1 addition & 2 deletions src/lavinmq/server.cr
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,6 @@ module LavinMQ
system_metrics(statm)
end
end

control_flow!
sleep Config.instance.stats_interval.milliseconds
end
Expand Down Expand Up @@ -390,7 +389,7 @@ module LavinMQ
getter stats_system_collection_duration_seconds = Time::Span.new

private def control_flow!
if disk_full?
if disk_full? || @replicator.set_flow_control?
if flow?
@log.info { "Low disk space: #{@disk_free.humanize}B, stopping flow" }
flow(false)
Expand Down

0 comments on commit 9576048

Please sign in to comment.