Skip to content

Commit

Permalink
move consumers deliver_loop to session, specs do not pass
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Oct 10, 2024
1 parent c0455ef commit 94c550a
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 15 deletions.
15 changes: 0 additions & 15 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -161,21 +161,6 @@ module LavinMQ

def initialize(@client : Client, @session : MQTT::Session)
@has_capacity.try_send? true
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

private def deliver_loop
session = @session
i = 0
# move deliver loop to session in order to control flow based on the consumer
loop do
session.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue ex
puts "deliver loop exiting: #{ex.inspect_with_backtrace}"
end

def details_tuple
Expand Down
15 changes: 15 additions & 0 deletions src/lavinmq/mqtt/session.cr
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,26 @@ module LavinMQ
@count = 0u16
@unacked = Deque(SegmentPosition).new
super(@vhost, @name, false, @auto_delete, arguments)
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

def clean_session?
@auto_delete
end

private def deliver_loop
i = 0
loop do
break if consumers.empty?
consume_get(consumers.first) do |env|
consumers.first.deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue ex
puts "deliver loop exiting: #{ex.inspect_with_backtrace}"
end

def client=(client : MQTT::Client?)
return if @closed
@last_get_time = RoughTime.monotonic
Expand All @@ -34,6 +48,7 @@ module LavinMQ

if c = client
@consumers << MqttConsumer.new(c, self)
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end
@log.debug { "Setting MQTT client" }
end
Expand Down

0 comments on commit 94c550a

Please sign in to comment.