diff --git a/src/lavinmq/clustering/actions.cr b/src/lavinmq/clustering/actions.cr index 1f1685441..ec4a19b99 100644 --- a/src/lavinmq/clustering/actions.cr +++ b/src/lavinmq/clustering/actions.cr @@ -14,6 +14,7 @@ module LavinMQ abstract def lag_size : Int64 abstract def send(socket : IO, log = Log) : Int64 + abstract def abort getter filename @@ -56,6 +57,12 @@ module LavinMQ end end end + + def abort + if mfile = @mfile + mfile.unreserve + end + end end struct AppendAction < Action @@ -99,6 +106,12 @@ module LavinMQ log.debug { "Append #{len} bytes to #{@filename}" } len end + + def abort + if fr = @obj.as?(FileRange) + fr.mfile.unreserve + end + end end struct DeleteAction < Action @@ -115,6 +128,9 @@ module LavinMQ socket.write_bytes 0i64 0i64 end + + def abort + end end end end diff --git a/src/lavinmq/clustering/follower.cr b/src/lavinmq/clustering/follower.cr index 7e72ea672..f9c08f77a 100644 --- a/src/lavinmq/clustering/follower.cr +++ b/src/lavinmq/clustering/follower.cr @@ -2,6 +2,7 @@ require "./actions" require "./file_index" require "../config" require "socket" +require "wait_group" module LavinMQ module Clustering @@ -11,7 +12,7 @@ module LavinMQ @acked_bytes = 0_i64 @sent_bytes = 0_i64 @actions = Channel(Action).new(Config.instance.clustering_max_unsynced_actions) - @closed = false + @running = WaitGroup.new getter id = -1 getter remote_address @@ -47,6 +48,7 @@ module LavinMQ end def action_loop(lz4 = @lz4) + @running.add while action = @actions.receive? action.send(lz4, Log) sent_bytes = action.lag_size.to_i64 @@ -58,12 +60,7 @@ module LavinMQ sync(sent_bytes) end ensure - begin - @lz4.close - @socket.close - rescue IO::Error - # ignore connection errors while closing - end + @running.done end private def sync(bytes, socket = @socket) : Nil @@ -75,9 +72,6 @@ module LavinMQ private def read_ack(socket = @socket) : Int64 len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian) @acked_bytes += len - if @closed && lag_in_bytes.zero? - @closed_and_in_sync.close - end len end @@ -172,18 +166,20 @@ module LavinMQ lag_size end - @closed_and_in_sync = Channel(Nil).new - def close(timeout : Time::Span = 30.seconds) - @closed = true @actions.close - if lag_in_bytes > 0 - Log.info { "Waiting for follower to be in sync" } - select - when @closed_and_in_sync.receive? - when timeout(timeout) - Log.warn { "Timeout waiting for follower to be in sync" } - end + @running.wait # let action_loop finish + + # abort remaining actions (unmap pending files) + while action = @actions.receive? + action.abort + end + + begin + @lz4.close + @socket.close + rescue IO::Error + # ignore connection errors while closing end end