Skip to content

Commit

Permalink
wrap vhost in broker for mqtt::client
Browse files Browse the repository at this point in the history
  • Loading branch information
kickster97 committed Sep 23, 2024
1 parent 790ac9a commit 7130ee3
Show file tree
Hide file tree
Showing 11 changed files with 165 additions and 141 deletions.
2 changes: 1 addition & 1 deletion spec/mqtt/integrations/connect_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ module MqttSpecs
end
end

pending "session present when reconnecting a non-clean session [MQTT-3.1.2-4]" do
it "session present when reconnecting a non-clean session [MQTT-3.1.2-4]" do
with_server do |server|
with_client_io(server) do |io|
connect(io, clean_session: false)
Expand Down
2 changes: 1 addition & 1 deletion src/lavinmq/config.cr
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require "./in_memory_backend"

module LavinMQ
class Config
DEFAULT_LOG_LEVEL = Log::Severity::Info
DEFAULT_LOG_LEVEL = Log::Severity::Trace

property data_dir : String = ENV.fetch("STATE_DIRECTORY", "/var/lib/lavinmq")
property config_file = File.exists?(File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini")) ? File.join(ENV.fetch("CONFIGURATION_DIRECTORY", "/etc/lavinmq"), "lavinmq.ini") : ""
Expand Down
32 changes: 16 additions & 16 deletions src/lavinmq/http/controller/queues.cr
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,22 @@ module LavinMQ
end
end

get "/api/queues/:vhost/:name/unacked" do |context, params|
with_vhost(context, params) do |vhost|
refuse_unless_management(context, user(context), vhost)
q = queue(context, params, vhost)
unacked_messages = q.consumers.each.flat_map do |c|
c.unacked_messages.each.compact_map do |u|
next unless u.queue == q
if consumer = u.consumer
UnackedMessage.new(c.channel, u.tag, u.delivered_at, consumer.tag)
end
end
end
unacked_messages = unacked_messages.chain(q.basic_get_unacked.each)
page(context, unacked_messages)
end
end
# get "/api/queues/:vhost/:name/unacked" do |context, params|
# with_vhost(context, params) do |vhost|
# refuse_unless_management(context, user(context), vhost)
# q = queue(context, params, vhost)
# # unacked_messages = q.consumers.each.flat_map do |c|
# # c.unacked_messages.each.compact_map do |u|
# # next unless u.queue == q
# # if consumer = u.consumer
# # UnackedMessage.new(c.channel, u.tag, u.delivered_at, consumer.tag)
# # end
# # end
# # end
# # unacked_messages = unacked_messages.chain(q.basic_get_unacked.each)
# # page(context, unacked_messages)
# end
# end

put "/api/queues/:vhost/:name" do |context, params|
with_vhost(context, params) do |vhost|
Expand Down
24 changes: 24 additions & 0 deletions src/lavinmq/mqtt/broker.cr
Original file line number Diff line number Diff line change
@@ -1,6 +1,30 @@
module LavinMQ
module MQTT
class Broker

getter vhost
def initialize(@vhost : VHost)
@queues = Hash(String, Session).new
@sessions = Hash(String, Session).new
end

def start_session(client : Client)
client_id = client.client_id
session = MQTT::Session.new(self, client_id)
@sessions[client_id] = session
@queues[client_id] = session
end

def clear_session(client : Client)
@sessions.delete client.client_id
@queues.delete client.client_id
end

# def connected(client) : MQTT::Session
# session = Session.new(client.vhost, client.client_id)
# session.connect(client)
# session
# end
end
end
end
Empty file removed src/lavinmq/mqtt/channel.cr
Empty file.
211 changes: 106 additions & 105 deletions src/lavinmq/mqtt/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -6,120 +6,33 @@ require "./session"

module LavinMQ
module MQTT
class MqttConsumer < LavinMQ::Client::Channel::Consumer
getter unacked = 0_u32
getter tag : String = "mqtt"
property prefetch_count = 1

def initialize(@client : Client, @queue : Queue)
@has_capacity.try_send? true
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

private def deliver_loop
queue = @queue
i = 0
loop do
queue.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue LavinMQ::Queue::ClosedError
rescue ex
puts "deliver loop exiting: #{ex.inspect}"
end

def details_tuple
{
queue: {
name: "mqtt.client_id",
vhost: "mqtt",
},
}
end

def no_ack?
true
end

def accepts? : Bool
true
end

def deliver(msg, sp, redelivered = false, recover = false)
packet_id = nil
if message_id = msg.properties.message_id
packet_id = message_id.to_u16 unless message_id.empty?
end
pub_args = {
packet_id: packet_id,
payload: msg.body,
dup: false,
qos: 0u8,
retain: false,
topic: "test",
}
@client.send(::MQTT::Protocol::Publish.new(**pub_args))
# MQTT::Protocol::PubAck.from_io(io) if pub_args[:qos].positive? && expect_response
end

def exclusive?
true
end

def cancel
end

def close
end

def closed?
false
end

def flow(active : Bool)
end

getter has_capacity = ::Channel(Bool).new

def ack(sp)
end

def reject(sp, requeue = false)
end

def priority
0
end
end

class Client < LavinMQ::Client
include Stats
include SortableJSON

getter vhost, channels, log, name, user, client_id
getter vhost, channels, log, name, user, client_id, socket
@channels = Hash(UInt16, Client::Channel).new
session : MQTT::Session
@session : MQTT::Session?
rate_stats({"send_oct", "recv_oct"})
Log = ::Log.for "MQTT.client"

def initialize(@socket : ::IO,
@connection_info : ConnectionInfo,
@vhost : VHost,
@user : User,
@vhost : VHost,
@broker : MQTT::Broker,
@client_id : String,
@clean_session = false,
@will : MQTT::Will? = nil)
@will : MQTT::Will? = nil
)
@io = MQTT::IO.new(@socket)
@lock = Mutex.new
@remote_address = @connection_info.src
@local_address = @connection_info.dst
@name = "#{@remote_address} -> #{@local_address}"
@metadata = ::Log::Metadata.new(nil, {vhost: @vhost.name, address: @remote_address.to_s})
@metadata = ::Log::Metadata.new(nil, {vhost: @broker.vhost.name, address: @remote_address.to_s})
@log = Logger.new(Log, @metadata)
@vhost.add_connection(self)
session = start_session(self)
# @session = @broker.connected(self)
@log.info { "Connection established for user=#{@user.name}" }
spawn read_loop
end
Expand All @@ -146,7 +59,7 @@ module LavinMQ
publish_will if @will
disconnect_session(self) if @clean_session
@socket.close
@vhost.rm_connection(self)
@broker.vhost.rm_connection(self)
end

def read_and_handle_packet
Expand Down Expand Up @@ -178,14 +91,14 @@ module LavinMQ
send MQTT::PingResp.new
end

def recieve_publish(packet)
def recieve_publish(packet : MQTT::Publish)
rk = topicfilter_to_routingkey(packet.topic)
props = AMQ::Protocol::Properties.new(
message_id: packet.packet_id.to_s
)
# TODO: String.new around payload.. should be stored as Bytes
msg = Message.new("amq.topic", rk, String.new(packet.payload), props)
@vhost.publish(msg)
@broker.vhost.publish(msg)
# Ok to not send anything if qos = 0 (at most once delivery)
if packet.qos > 0 && (packet_id = packet.packet_id)
send(MQTT::PubAck.new(packet_id))
Expand All @@ -201,14 +114,14 @@ module LavinMQ
auto_delete = true
tbl = AMQP::Table.new
# TODO: declare Session instead
q = @vhost.declare_queue(name, durable, auto_delete, tbl)
q = @broker.vhost.declare_queue(name, durable, auto_delete, tbl)
qos = Array(MQTT::SubAck::ReturnCode).new
packet.topic_filters.each do |tf|
qos << MQTT::SubAck::ReturnCode.from_int(tf.qos)
rk = topicfilter_to_routingkey(tf.topic)
@vhost.bind_queue(name, "amq.topic", rk)
@broker.vhost.bind_queue(name, "amq.topic", rk)
end
queue = @vhost.queues[name]
queue = @broker.vhost.queues[name]
consumer = MqttConsumer.new(self, queue)
queue.add_consumer(consumer)
send(MQTT::SubAck.new(qos, packet.packet_id))
Expand All @@ -223,7 +136,7 @@ module LavinMQ

def details_tuple
{
vhost: @vhost.name,
vhost: @broker.vhost.name,
user: @user.name,
protocol: "MQTT",
client_id: @client_id,
Expand All @@ -233,14 +146,14 @@ module LavinMQ
def start_session(client) : MQTT::Session
if @clean_session
pp "clear session"
@vhost.clear_session(client)
@broker.clear_session(client)
end
@vhost.start_session(client)
@broker.start_session(client)
end

def disconnect_session(client)
pp "disconnect session"
@vhost.clear_session(client)
@broker.clear_session(client)
end

# TODO: actually publish will to session
Expand All @@ -261,5 +174,93 @@ module LavinMQ
def force_close
end
end

class MqttConsumer < LavinMQ::Client::Channel::Consumer
getter unacked = 0_u32
getter tag : String = "mqtt"
property prefetch_count = 1

def initialize(@client : Client, @queue : Queue)
@has_capacity.try_send? true
spawn deliver_loop, name: "Consumer deliver loop", same_thread: true
end

private def deliver_loop
queue = @queue
i = 0
loop do
queue.consume_get(self) do |env|
deliver(env.message, env.segment_position, env.redelivered)
end
Fiber.yield if (i &+= 1) % 32768 == 0
end
rescue LavinMQ::Queue::ClosedError
rescue ex
puts "deliver loop exiting: #{ex.inspect}"
end

def details_tuple
{
queue: {
name: "mqtt.client_id",
vhost: "mqtt",
},
}
end

def no_ack?
true
end

def accepts? : Bool
true
end

def deliver(msg, sp, redelivered = false, recover = false)
packet_id = nil
if message_id = msg.properties.message_id
packet_id = message_id.to_u16 unless message_id.empty?
end
pub_args = {
packet_id: packet_id,
payload: msg.body,
dup: false,
qos: 0u8,
retain: false,
topic: "test",
}
@client.send(::MQTT::Protocol::Publish.new(**pub_args))
# MQTT::Protocol::PubAck.from_io(io) if pub_args[:qos].positive? && expect_response
end

def exclusive?
true
end

def cancel
end

def close
end

def closed?
false
end

def flow(active : Bool)
end

getter has_capacity = ::Channel(Bool).new

def ack(sp)
end

def reject(sp, requeue = false)
end

def priority
0
end
end
end
end
Loading

0 comments on commit 7130ee3

Please sign in to comment.