Skip to content

Commit

Permalink
StreamQueue: only unmap unused segments
Browse files Browse the repository at this point in the history
Previous behavior could unmap segments that other consumers were still
delivering from, causing segfaults.
  • Loading branch information
carlhoerberg committed Aug 23, 2023
1 parent de3ffdb commit 5b1b092
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
21 changes: 21 additions & 0 deletions src/lavinmq/queue/stream_queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ module LavinMQ
class StreamQueue < Queue
@durable = true

def initialize(@vhost : VHost, @name : String,
@exclusive = false, @auto_delete = false,
@arguments = Hash(String, AMQP::Field).new)
super
spawn unmap_unused_segments_loop, name: "StreamQueue#unmap_unused_segments_loop"
end

def apply_policy(policy : Policy?, operator_policy : OperatorPolicy?)
super
if max_age_value = Policy.merge_definitions(policy, operator_policy)["max-age"]?
Expand Down Expand Up @@ -140,5 +147,19 @@ module LavinMQ
raise LavinMQ::Error::PreconditionFailed.new("max-age must be a string")
end
end

private def unmap_unused_segments_loop
until closed?
sleep 60
unmap_unused_segments
end
end

private def unmap_unused_segments
used_segments = @consumers_lock.synchronize { @consumers.map &.as(Client::Channel::StreamConsumer).segment }
@msg_store_lock.synchronize do
stream_queue_msg_store.unmap_segments(except: used_segments)
end
end
end
end
11 changes: 8 additions & 3 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ module LavinMQ
drop_overflow
end

def unmap_segments(except : Enumerable(UInt32))
@segments.each do |seg_id, mfile|
next if mfile == @wfile
next if except.includes? seg_id
mfile.unmap
end
end

private def get_last_offset : Int64
return 0i64 if @size.zero?
bytesize = 0_u32
Expand Down Expand Up @@ -61,15 +69,13 @@ module LavinMQ
{@last_offset + 1, @segments.last_key, @segments.last_value.size.to_u32}
end

# ameba:disable Metrics/CyclomaticComplexity
private def find_offset_in_segments(offset : Int | Time) : Tuple(Int64, UInt32, UInt32)
segment = @segments.first_key
pos = 4u32
msg_offset = 0i64
loop do
rfile = @segments[segment]?
if rfile.nil? || pos == rfile.size
rfile.unmap if rfile && rfile != @wfile
if segment = @segments.each_key.find { |sid| sid > segment }
rfile = @segments[segment]
pos = 4_u32
Expand Down Expand Up @@ -101,7 +107,6 @@ module LavinMQ
rfile = @segments[consumer.segment]? || next_segment(consumer) || return
if consumer.pos == rfile.size # EOF
return if rfile == @wfile
rfile.unmap
rfile = next_segment(consumer) || return
end
begin
Expand Down

0 comments on commit 5b1b092

Please sign in to comment.