Skip to content

Commit

Permalink
Unmap segments when all consumers has disconnected from a queue
Browse files Browse the repository at this point in the history
  • Loading branch information
carlhoerberg committed Sep 8, 2023
1 parent ec55e34 commit 2e43fbb
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 10 deletions.
8 changes: 8 additions & 0 deletions src/lavinmq/queue/message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,14 @@ module LavinMQ
(@bytesize / @size).to_u32
end

def unmap_segments(except : Enumerable(UInt32) = StaticArray(UInt32, 0).new(0u32))
@segments.each do |seg_id, mfile|
next if mfile == @wfile
next if except.includes? seg_id
mfile.unmap
end
end

private def select_next_read_segment : MFile?
# Expect @segments to be ordered
if id = @segments.each_key.find { |sid| sid > @rfile_id }
Expand Down
10 changes: 8 additions & 2 deletions src/lavinmq/queue/queue.cr
Original file line number Diff line number Diff line change
Expand Up @@ -831,8 +831,14 @@ module LavinMQ
end
end
if @consumers.empty?
notify_consumers_empty(true)
delete if @auto_delete
if @auto_delete
delete
else
notify_consumers_empty(true)
@msg_store_lock.synchronize do
@msg_store.unmap_segments
end
end
end
end

Expand Down
8 changes: 0 additions & 8 deletions src/lavinmq/queue/stream_queue_message_store.cr
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@ module LavinMQ
drop_overflow
end

def unmap_segments(except : Enumerable(UInt32))
@segments.each do |seg_id, mfile|
next if mfile == @wfile
next if except.includes? seg_id
mfile.unmap
end
end

private def get_last_offset : Int64
return 0i64 if @size.zero?
bytesize = 0_u32
Expand Down

0 comments on commit 2e43fbb

Please sign in to comment.