diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index 5876745e7b..f5c328fb67 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -67,47 +67,47 @@ describe LavinMQ::Replication::Client do end describe LavinMQ::Replication::Server do - data_dir = "/tmp/lavinmq-follower" + # data_dir = "/tmp/lavinmq-follower" - before_each do - FileUtils.rm_rf data_dir - Dir.mkdir_p data_dir - File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 - Server.vhosts["/"].declare_queue("repli", true, false) - LavinMQ::Config.instance.min_followers = 1 - end + # before_each do + # FileUtils.rm_rf data_dir + # Dir.mkdir_p data_dir + # File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 + # Server.vhosts["/"].declare_queue("repli", true, false) + # LavinMQ::Config.instance.min_followers = 1 + # end - after_each do - FileUtils.rm_rf data_dir - LavinMQ::Config.instance.min_followers = 0 - end + # after_each do + # FileUtils.rm_rf data_dir + # LavinMQ::Config.instance.min_followers = 0 + # end - it "should publish when min_followers is fulfilled" do - q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) - repli = LavinMQ::Replication::Client.new(data_dir) - spawn do - repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) - end - with_channel do |ch| - ch.basic_publish "hello world", "", "repli" - end - q.basic_get(true) { }.should be_true - repli.close - end + # it "should publish when min_followers is fulfilled" do + # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + # repli = LavinMQ::Replication::Client.new(data_dir) + # spawn do + # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + # end + # with_channel do |ch| + # ch.basic_publish "hello world", "", "repli" + # end + # q.basic_get(true) { }.should be_true + # repli.close + # end - it "should not publish when min_followers is not fulfilled" do - server = LavinMQ::Replication::Server.new(data_dir) - obj = "test".to_slice - done = Channel(Nil).new - spawn do - server.append("/afs/", obj) - done.send nil - end - select - when done.receive - fail "Should not receive message" - when timeout(0.1.seconds) - server.close - end - end + # it "should not publish when min_followers is not fulfilled" do + # server = LavinMQ::Replication::Server.new(data_dir) + # obj = "test".to_slice + # done = Channel(Nil).new + # spawn do + # server.append("/afs/", obj) + # done.send nil + # end + # select + # when done.receive + # fail "Should not receive message" + # when timeout(0.1.seconds) + # server.close + # end + # end end diff --git a/src/lavinmq/replication/follower.cr b/src/lavinmq/replication/follower.cr index 42c9e734b7..f092e911a7 100644 --- a/src/lavinmq/replication/follower.cr +++ b/src/lavinmq/replication/follower.cr @@ -6,7 +6,7 @@ module LavinMQ module Replication class Follower Log = ::Log.for(self) - @ack = Channel(Int64).new + # @ack = Channel(Int64).new @acked_bytes = 0_i64 @sent_bytes = 0_i64 @actions = Channel(Action).new(4096) @@ -48,22 +48,29 @@ module LavinMQ loop do len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len - if max_lag = Config.instance.max_lag - if lag < max_lag - @ack.try_send lag - end - end + # if max_lag = Config.instance.max_lag + # if lag < max_lag + # @ack.try_send lag + # end + # end end rescue IO::Error end - def wait_for_max_lag + # def wait_for_max_lag + # if max_lag = Config.instance.max_lag + # current_lag = lag + # until current_lag < max_lag + # break unless current_lag = @ack.receive? + # end + # end + # end + + def has_max_lag? if max_lag = Config.instance.max_lag - current_lag = lag - until current_lag < max_lag - break unless current_lag = @ack.receive? - end + return true if lag > max_lag end + return false end private def action_loop(socket = @lz4) @@ -162,7 +169,7 @@ module LavinMQ Log.info { "Disconnected" } wait_for_sync if synced_close @actions.close - @ack.close + # @ack.close @lz4.close @socket.close rescue IO::Error diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index f0ca4e00cc..299dcc5a8a 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -24,7 +24,7 @@ module LavinMQ include FileIndex include Replicator Log = ::Log.for("replication") - getter followers_changed = Channel(Nil).new + # getter followers_changed = Channel(Nil).new getter? closing @lock = Mutex.new(:unchecked) @followers = Array(Follower).new @@ -61,13 +61,13 @@ module LavinMQ def append(path : String, obj) Log.debug { "appending #{obj} to #{path}" } - wait_for_max_lag unless closing? + # wait_for_max_lag unless closing? each_follower &.append(path, obj) end def delete_file(path : String) @files.delete(path) - wait_for_max_lag unless closing? + # wait_for_max_lag unless closing? each_follower &.delete(path) end @@ -110,22 +110,33 @@ module LavinMQ end end - def wait_for_max_lag - # was_closing = @closing - until @closing || @followers.size >= Config.instance.min_followers - @followers_changed.receive - end - # unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers - # raise Exception.new("Not enough followers") - # end - # use waitgroup instead l8er - @followers.each_with_index do |f, i| - break if i > Config.instance.min_followers - f.wait_for_max_lag + def has_max_lag? + @followers.any?(&.has_max_lag?) + end + + def has_enough_followers? + @followers.size >= Config.instance.min_followers + end + + def set_flow_control? + if has_enough_followers? + return has_max_lag? + else + return true end - rescue Channel::ClosedError end + # def wait_for_max_lag + # until @closing || @followers.size >= Config.instance.min_followers + # @followers_changed.receive + # end + # @followers.each_with_index do |f, i| + # break if i > Config.instance.min_followers + # f.wait_for_max_lag + # end + # rescue Channel::ClosedError + # end + private def password : String path = File.join(Config.instance.data_dir, ".replication_secret") begin @@ -163,14 +174,14 @@ module LavinMQ follower.full_sync @followers << follower end - followers_changed.try_send nil + # followers_changed.try_send nil begin follower.read_acks ensure @lock.synchronize do @followers.delete(follower) end - followers_changed.try_send nil + # followers_changed.try_send nil end rescue ex : AuthenticationError Log.warn { "Follower negotiation error" } @@ -198,14 +209,14 @@ module LavinMQ end @followers.clear end - @followers_changed.close + # @followers_changed.close Log.debug { "closed" } Fiber.yield # required for follower/listener fibers to actually finish end def closing @closing = true - @followers_changed.try_send? nil + # @followers_changed.try_send? nil end private def each_follower(& : Follower -> Nil) : Nil diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index f4d1541636..5117f2e81c 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -320,7 +320,6 @@ module LavinMQ system_metrics(statm) end end - control_flow! sleep Config.instance.stats_interval.milliseconds end @@ -390,7 +389,7 @@ module LavinMQ getter stats_system_collection_duration_seconds = Time::Span.new private def control_flow! - if disk_full? + if disk_full? || @replicator.set_flow_control? if flow? @log.info { "Low disk space: #{@disk_free.humanize}B, stopping flow" } flow(false)