Skip to content

Commit

Permalink
abort pending actions when follower closes
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Dec 12, 2024
1 parent 1c1d372 commit c4df472
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 20 deletions.
16 changes: 16 additions & 0 deletions src/lavinmq/clustering/actions.cr
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module LavinMQ

abstract def lag_size : Int64
abstract def send(socket : IO, log = Log) : Int64
abstract def abort

getter filename

Expand Down Expand Up @@ -56,6 +57,12 @@ module LavinMQ
end
end
end

def abort
if mfile = @mfile
mfile.unreserve
end
end
end

struct AppendAction < Action
Expand Down Expand Up @@ -99,6 +106,12 @@ module LavinMQ
log.debug { "Append #{len} bytes to #{@filename}" }
len
end

def abort
if fr = @obj.as?(FileRange)
fr.mfile.unreserve
end
end
end

struct DeleteAction < Action
Expand All @@ -115,6 +128,9 @@ module LavinMQ
socket.write_bytes 0i64
0i64
end

def abort
end
end
end
end
36 changes: 16 additions & 20 deletions src/lavinmq/clustering/follower.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "./actions"
require "./file_index"
require "../config"
require "socket"
require "wait_group"

module LavinMQ
module Clustering
Expand All @@ -11,7 +12,7 @@ module LavinMQ
@acked_bytes = 0_i64
@sent_bytes = 0_i64
@actions = Channel(Action).new(Config.instance.clustering_max_unsynced_actions)
@closed = false
@running = WaitGroup.new
getter id = -1
getter remote_address

Expand Down Expand Up @@ -47,6 +48,7 @@ module LavinMQ
end

def action_loop(lz4 = @lz4)
@running.add
while action = @actions.receive?
action.send(lz4, Log)
sent_bytes = action.lag_size.to_i64
Expand All @@ -58,12 +60,7 @@ module LavinMQ
sync(sent_bytes)
end
ensure
begin
@lz4.close
@socket.close
rescue IO::Error
# ignore connection errors while closing
end
@running.done
end

private def sync(bytes, socket = @socket) : Nil
Expand All @@ -75,9 +72,6 @@ module LavinMQ
private def read_ack(socket = @socket) : Int64
len = socket.read_bytes(Int64, IO::ByteFormat::LittleEndian)
@acked_bytes += len
if @closed && lag_in_bytes.zero?
@closed_and_in_sync.close
end
len
end

Expand Down Expand Up @@ -172,18 +166,20 @@ module LavinMQ
lag_size
end

@closed_and_in_sync = Channel(Nil).new

def close(timeout : Time::Span = 30.seconds)
@closed = true
@actions.close
if lag_in_bytes > 0
Log.info { "Waiting for follower to be in sync" }
select
when @closed_and_in_sync.receive?
when timeout(timeout)
Log.warn { "Timeout waiting for follower to be in sync" }
end
@running.wait # let action_loop finish

# abort remaining actions (unmap pending files)
while action = @actions.receive?
action.abort
end

begin
@lz4.close
@socket.close
rescue IO::Error
# ignore connection errors while closing
end
end

Expand Down

0 comments on commit c4df472

Please sign in to comment.