From ad09683618814dccfdab55ff009a662dc62b6037 Mon Sep 17 00:00:00 2001 From: Christina Date: Tue, 14 Nov 2023 16:00:46 +0100 Subject: [PATCH] Beginning of replication settings spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Jon Börjesson jon@84codes.com --- spec/replication_spec.cr | 61 +++++++++++++++++++++++++++++++ spec/spec_helper.cr | 2 +- src/lavinmq/replication/client.cr | 4 +- src/lavinmq/replication/server.cr | 25 ++++++++++--- src/lavinmq/server.cr | 2 + src/lavinmq/vhost.cr | 5 ++- 6 files changed, 89 insertions(+), 10 deletions(-) diff --git a/spec/replication_spec.cr b/spec/replication_spec.cr index e3851c52d0..1bb2cd4522 100644 --- a/spec/replication_spec.cr +++ b/spec/replication_spec.cr @@ -65,3 +65,64 @@ describe LavinMQ::Replication::Client do end end end + +describe LavinMQ::Replication::Server do + data_dir = "/tmp/lavinmq-follower" + + before_each do + FileUtils.rm_rf data_dir + Dir.mkdir_p data_dir + File.write File.join(data_dir, ".replication_secret"), Server.@replicator.@password, 0o400 + Server.vhosts["/"].declare_queue("repli", true, false) + LavinMQ::Config.instance.min_followers = 1 + end + + after_each do + FileUtils.rm_rf data_dir + LavinMQ::Config.instance.min_followers = 0 + end + + it "+min_followers" do + done = Channel(Nil).new + client : AMQP::Client::Connection? = nil + spawn do + with_channel do |ch, conn| + client = conn + q = ch.queue("repli") + q.publish_confirm "hello world" + done.send nil + end + end + select + when done.receive + fail "should not receive message" + when timeout(0.1.seconds) + pp "timeout" + client.try &.close(no_wait: true) + Server.close + end + + # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + # q.basic_get(true) { }.should be_false + + # repli = LavinMQ::Replication::Client.new(data_dir) + # spawn do + # repli.follow("127.0.0.1", LavinMQ::Config.instance.replication_port) + # end + # with_channel do |ch| + # ch.basic_publish "hello world", "", "repli" + # end + # q = Server.vhosts["/"].queues["repli"].as(LavinMQ::Queue) + # q.basic_get(true) { }.should be_true + + # repli.close + end + + it "-min_followers" do + end + + it "+max_lag" do + end + it "-max_lag" do + end +end diff --git a/spec/spec_helper.cr b/spec/spec_helper.cr index f01fe6b6df..fa622ab920 100644 --- a/spec/spec_helper.cr +++ b/spec/spec_helper.cr @@ -37,7 +37,7 @@ def with_channel(**args, &) args = {port: LavinMQ::Config.instance.amqp_port, name: name}.merge(args) conn = AMQP::Client.new(**args).connect ch = conn.channel - yield ch + yield ch, conn ensure conn.try &.close(no_wait: false) end diff --git a/src/lavinmq/replication/client.cr b/src/lavinmq/replication/client.cr index eea6480f9e..74fb1c7417 100644 --- a/src/lavinmq/replication/client.cr +++ b/src/lavinmq/replication/client.cr @@ -10,13 +10,13 @@ module LavinMQ @data_dir_lock : DataDirLock? @closed = false - def initialize(@data_dir : String) + def initialize(@data_dir : String, pwd : String? = nil) System.maximize_fd_limit @socket = TCPSocket.new @socket.sync = true @socket.read_buffering = false @lz4 = Compress::LZ4::Reader.new(@socket) - @password = password + @password = pwd || password @files = Hash(String, File).new do |h, k| path = File.join(@data_dir, k) Dir.mkdir_p File.dirname(path) diff --git a/src/lavinmq/replication/server.cr b/src/lavinmq/replication/server.cr index db6e80cb04..558b220e47 100644 --- a/src/lavinmq/replication/server.cr +++ b/src/lavinmq/replication/server.cr @@ -24,9 +24,9 @@ module LavinMQ @followers = Array(Follower).new @password : String @files = Hash(String, MFile?).new + @closing = false - def initialize - @password = password + def initialize(@password = password) @tcp = TCPServer.new @tcp.sync = false @tcp.reuse_address = true @@ -101,13 +101,18 @@ module LavinMQ end def wait_for_max_lag - until @followers.size >= Config.instance.min_followers - break unless @followers_changed.receive? + # was_closing = @closing + until @closing || @followers.size >= Config.instance.min_followers + @followers_changed.receive end + # unless (!was_closing && @closing) || @followers.size >= Config.instance.min_followers + # raise Exception.new("Not enough followers") + # end @followers.each_with_index do |f, i| break if i > Config.instance.min_followers f.wait_for_max_lag end + rescue Channel::ClosedError end private def password : String @@ -147,12 +152,14 @@ module LavinMQ follower.full_sync @followers << follower end + followers_changed.try_send nil begin follower.read_acks ensure @lock.synchronize do @followers.delete(follower) end + followers_changed.try_send nil end rescue ex : AuthenticationError Log.warn { "Follower negotiation error" } @@ -165,14 +172,22 @@ module LavinMQ end def close + Log.debug { "closing" } @tcp.close @lock.synchronize do @followers.each &.close @followers.clear end + @followers_changed.close + Log.debug { "closed" } Fiber.yield # required for follower/listener fibers to actually finish end + def closing + @closing = true + @followers_changed.send nil + end + private def each_follower(& : Follower -> Nil) : Nil @lock.synchronize do @followers.each do |f| @@ -220,7 +235,6 @@ module LavinMQ def read_acks(socket = @socket) : Nil spawn action_loop, name: "Follower#action_loop" - @server.followers_changed.try_send nil loop do len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len @@ -413,7 +427,6 @@ module LavinMQ Log.info { "Disconnected" } @actions.close @ack.close - @server.followers_changed.try_send nil @lz4.close @socket.close rescue IO::Error diff --git a/src/lavinmq/server.cr b/src/lavinmq/server.cr index 1d38c88f16..c364b905c1 100644 --- a/src/lavinmq/server.cr +++ b/src/lavinmq/server.cr @@ -168,10 +168,12 @@ module LavinMQ def close @closed = true + @replicator.closing @log.debug { "Closing listeners" } @listeners.each_key &.close @log.debug { "Closing vhosts" } @vhosts.close + @log.debug { "Closing replicator" } @replicator.close end diff --git a/src/lavinmq/vhost.cr b/src/lavinmq/vhost.cr index 72248a2e0b..65662b3368 100644 --- a/src/lavinmq/vhost.cr +++ b/src/lavinmq/vhost.cr @@ -444,8 +444,11 @@ module LavinMQ sleep 0.1 end # then force close the remaining (close tcp socket) + @log.debug {"force closing connection"} unless connections.empty? + @connections.each &.force_close Fiber.yield # yield so that Client read_loops can shutdown + @log.debug { "Closing queues" } @queues.each_value &.close Fiber.yield compact! @@ -610,7 +613,7 @@ module LavinMQ end io.fsync File.rename io.path, @definitions_file_path - @replicator.replace_file @definitions_file_path + # @replicator.replace_file @definitions_file_path @definitions_file.close @definitions_file = io end