-
Notifications
You must be signed in to change notification settings - Fork 34
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT-support #766
base: main
Are you sure you want to change the base?
MQTT-support #766
Conversation
cb77ee8
to
7130ee3
Compare
I think you can (and maybe even should) build clean session by reusing the auto delete feature of queues. |
787840d
to
4e747f2
Compare
About #803 (comment) That connect spec passes, but for the wrong reason. Or well, the socket is closed but not because it receives the wrong packet type, but because of the bug that #803 will fix. When that bug is fixed i still think that the spec will pass, but for the right reason. I guess the spec could be changed to send a |
After giving it a second thought I've concluded that we should change the spec to use some other packet type. |
ok I will change the spec then! thanks :) |
Often it's more efficient to |
* deliver packet not msg from session * it's already a topic, no convert needed * fixes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial review
@@ -73,8 +73,10 @@ module LavinMQ | |||
end | |||
|
|||
private def read_ack(socket = @socket) : Int64 | |||
# Sometimes when running clustering_spec len is greater than sent_bytes. Causing lag_in_bytes to be negative. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how is that possible? And should this comment really be here?
@@ -11,7 +11,7 @@ module LavinMQ | |||
Socket::IPAddress.new("127.0.0.1", 0) # Fake when UNIXAddress | |||
connection_info = ConnectionInfo.new(remote_address, local_address) | |||
io = WebSocketIO.new(ws) | |||
spawn amqp_server.handle_connection(io, connection_info), name: "HandleWSconnection #{remote_address}" | |||
spawn amqp_server.handle_connection(io, connection_info, Server::Protocol::AMQP), name: "HandleWSconnection #{remote_address}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe at some point we should support MQTT over websockets too?
end | ||
end | ||
|
||
def each(subscription : String, &block : String, Bytes -> Nil) : Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the only location we use this method we convert the body to a String, which will raise if there is binary data that can't be parsed as utf8, but also, why don't we just return the File/IO object?
def each(subscription : String, &block : String, Bytes -> Nil) : Nil | |
def each(subscription : String, &block : String, IO -> Nil) : Nil |
hash = @hasher.hexfinal | ||
@hasher.reset | ||
"#{hash}#{MESSAGE_FILE_SUFFIX}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hash = @hasher.hexfinal | |
@hasher.reset | |
"#{hash}#{MESSAGE_FILE_SUFFIX}" | |
"#{@hasher.hexfinal}#{MESSAGE_FILE_SUFFIX}" | |
ensure | |
@hasher.reset | |
def initialize(@users : UserStore, | ||
@vhosts : VHostStore, | ||
replicator : Clustering::Replicator) | ||
@brokers = Brokers.new(@vhosts, replicator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this be initialized in the Server
instead? Along with VHostStore
?
end | ||
|
||
def close(reason = "") | ||
@log.trace { "Client#close" } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this really a trace? Rather a debug
i think
@log.trace { "Client#close" } | |
@log.debug { "Client#close" } |
elsif q.is_a?(LavinMQ::MQTT::Session) | ||
access_refused(context, "Not allowed to bind to an MQTT session") | ||
elsif e.is_a?(LavinMQ::MQTT::Exchange) | ||
access_refused(context, "Not allowed to bind to the default MQTT exchange") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about handling this with different overloads of Exchange#bind
?
We're already doing that in MQTT::Exchange
to prevent it from being bound to AMQP queues:
https://github.com/cloudamqp/lavinmq/blob/mqtt-poc/src/lavinmq/mqtt/exchange.cr#L90-L122
By doing that you get the same validation for AMQP clients and mgmt ui.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We worked a bit on this and turns out to be trickier than we thought. I put this in the description
The first version of MQTT Support does not support binding an AMQP exchange to and MQTT exchange. (Undefined behaviour if done)
In a separate PR we need to extract the AMQP exchange types into the AMQP namespace and in turn be able to differentiate between an AMQP::Destination and an MQTT::Destiantion when calling bind.
With that in place we will be able to safely do exchange to exchange bindings.
WHAT is this pull request doing?
An MQTT-session inherits AMQP::Queue and extends with MQTT functionality.
MQTT exchange(topic) will only accept activity from an MQTT client, that will be responsible for routing messages to the MQTT-session.
The mqtt exchange handles MQTT routing keys etc. without needing to do conversions.
NOTE
The first version of MQTT Support does not support binding an AMQP exchange to and MQTT exchange. (Undefined behaviour if done)
In a separate PR we need to extract the AMQP exchange types into the AMQP namespace and in turn be able to differentiate between an
AMQP::Destination
and anMQTT::Destiantion
when callingbind
.With that in place we will be able to safely do exchange to exchange bindings.
Docs for the website
HOW can this pull request be tested?
Specs have been migrated from Myra and fully cover the MQTT protocol, run with crystal spec.
Connect with mqtt client library of your choice and test your usage against this branch