Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT-support #766

Open
wants to merge 188 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
188 commits
Select commit Hold shift + click to select a range
505588b
beginning of mqtt-poc, ping
kickster97 Aug 26, 2024
45ba0f9
add send/recieve oct_count, make prometheus controller compile
kickster97 Aug 27, 2024
6492763
client stores client_id and quick solution for UI
kickster97 Aug 28, 2024
4d57ff0
amqp publish
kickster97 Sep 6, 2024
8e6c6cf
mqtt integration spec: ping
snichme Sep 6, 2024
7347ed8
fixup! amqp publish
kickster97 Sep 6, 2024
2165723
int-specs
snichme Sep 6, 2024
fdad4d5
cleanup
snichme Sep 6, 2024
77963a9
connect specs (pending)
snichme Sep 10, 2024
48dd059
all integration specs (pending)
snichme Sep 10, 2024
c702b01
Synchronizer -> channel
snichme Sep 10, 2024
2a381b6
spec: publish mqtt ends up in amqp queue
snichme Sep 11, 2024
8076cdd
send puback after publish if qos > 0
kickster97 Sep 12, 2024
88ef613
publish will to session WIP
kickster97 Sep 12, 2024
21df350
publish returns PubAck
snichme Sep 12, 2024
ab83ec2
cleanup will preparation
kickster97 Sep 16, 2024
0d402a8
Got first working subscribe
snichme Sep 16, 2024
8b08d11
subscribe specs
snichme Sep 16, 2024
d0ba355
packet_id stuff
snichme Sep 16, 2024
b6d3996
pass more connect specs
kickster97 Sep 17, 2024
db6abee
work on subscriber specs
kickster97 Sep 17, 2024
6a00891
start working on broker for clients
kickster97 Sep 17, 2024
592600c
wrap vhost in broker for mqtt::client
kickster97 Sep 23, 2024
4f20533
pass more specs and handle session_present
kickster97 Sep 23, 2024
5f588ab
all connect specs working except replace client with new connection
kickster97 Sep 23, 2024
6042f6f
fix logs in client and dot expect connack in specs
kickster97 Sep 24, 2024
47be1ee
add sessions to broker, and handle incoming subscribe with session
kickster97 Sep 25, 2024
622ac3c
fixup! add sessions to broker, and handle incoming subscribe with ses…
kickster97 Sep 25, 2024
421a098
fixup! fixup! add sessions to broker, and handle incoming subscribe w…
kickster97 Sep 25, 2024
44b791d
subscribe and bind a session
kickster97 Sep 27, 2024
c931ce2
temp
kickster97 Oct 1, 2024
b5d3a98
exchange.publish
snichme Sep 18, 2024
dafc567
mqtt exchange
snichme Oct 1, 2024
8baf983
refactor mqtt session
kickster97 Oct 2, 2024
585cb6b
add get method to prepare for qos 1
kickster97 Oct 2, 2024
5d76339
feel like this got ugly, but subscribe specs are now passing
kickster97 Oct 3, 2024
81467de
cleanup old code and use session instead of queue
kickster97 Oct 3, 2024
7f238e7
refctoring consumer handling, and handle msg acks better
kickster97 Oct 10, 2024
5bfa977
move consumers deliver_loop to session, specs do not pass
kickster97 Oct 10, 2024
4ecf4a9
cleanup
kickster97 Oct 10, 2024
57e0ac8
SubscriptionTree
snichme Oct 10, 2024
0650194
rebase main
kickster97 Oct 14, 2024
e1a2b82
format
kickster97 Oct 14, 2024
bd96dcf
override method
snichme Oct 14, 2024
9fc032b
string token specs
snichme Oct 15, 2024
151860d
crystal 1.14 fixes
snichme Oct 15, 2024
578aca1
subscription tree specs
snichme Oct 15, 2024
42ce5ee
beginning of puback
kickster97 Oct 15, 2024
de90cb7
subscription tree specs
snichme Oct 15, 2024
41a2798
remove puts
snichme Oct 15, 2024
db83ff8
Use monkey patched `IO::Buffered#peek(n)` to ensure data exists
spuun Oct 11, 2024
daa13e3
It should return slice
spuun Oct 11, 2024
1daac59
Add spec to verify positive size check
spuun Oct 11, 2024
f431ac4
Rename stuff in an attempt to make things more readable
spuun Oct 11, 2024
275231b
Refactor specs
spuun Oct 11, 2024
e82506a
Cleaner specs
spuun Oct 11, 2024
32d4e9a
Update spec description
spuun Oct 11, 2024
c036d47
Fix condition when checking if enough data exists
spuun Oct 11, 2024
46c8a39
Move data in existing buffer if needed
spuun Oct 11, 2024
ce245c5
Return what we got instead of rasing in read fails
spuun Oct 11, 2024
e245bcb
Remove unused code
spuun Oct 12, 2024
2ae36c2
merge jons pr and fix sleep problem
kickster97 Oct 15, 2024
8062e81
spec fix for subtree
snichme Oct 15, 2024
f021fda
set dup value when delivering a msg
kickster97 Oct 16, 2024
2dd49d0
cleanup
kickster97 Oct 16, 2024
06d8175
retain_store and topic_tree
kickster97 Oct 16, 2024
fd19bff
Restructure retain-store and topic_tree for better fit in LavinMQ
kickster97 Oct 17, 2024
f0f8f39
publish retained message
kickster97 Oct 17, 2024
ec59be4
Update src/lavinmq/mqtt/retain_store.cr
kickster97 Oct 18, 2024
7d765ad
retained messages spec pass
kickster97 Oct 18, 2024
7453bc5
Improve handling of non-clean sesssions
spuun Oct 18, 2024
1d1cfd0
pass will specs
kickster97 Oct 21, 2024
5c16b7f
cleanup
kickster97 Oct 21, 2024
00250bb
pass duplicate message specs
kickster97 Oct 21, 2024
6b989cc
format
kickster97 Oct 21, 2024
8172225
fix publish
kickster97 Oct 21, 2024
5580ea4
remove obsolete header
kickster97 Oct 21, 2024
9226687
cleanup
kickster97 Oct 21, 2024
0d7a268
raise io error for invalid package_id
kickster97 Oct 21, 2024
25b1aef
handle raise for double connect
kickster97 Oct 21, 2024
cfe5eeb
revert double connect rescue
kickster97 Oct 22, 2024
183609a
Remove unused variable
spuun Oct 22, 2024
11c7f30
Use method overloading instead of type check
spuun Oct 22, 2024
4626c1f
Use lowercase log source
spuun Oct 22, 2024
dd18f2f
Use mqtt.session as log source for Session
spuun Oct 22, 2024
02ecf2e
Add spec to test publisher->subscriber flow
spuun Oct 22, 2024
21ea60f
Add spec to verify that session is restored properly
spuun Oct 22, 2024
7863af6
Use getter instead of instance variable
spuun Oct 22, 2024
17dd34b
beginning of max_inflight
kickster97 Oct 22, 2024
b44990c
send in topic to subscription tree, pass specs
kickster97 Oct 22, 2024
2e49681
clean up
kickster97 Oct 22, 2024
f263009
create publish packet in client instead of accepting will in #broker:…
kickster97 Oct 22, 2024
750dd6a
Be consistent with typing
spuun Oct 22, 2024
8f15518
Add routing specs
spuun Oct 22, 2024
d4f9ad3
Lint
spuun Oct 22, 2024
8618839
Return nil and let spec assert
spuun Oct 23, 2024
62625f8
Add a will spec (and some clean up)
spuun Oct 23, 2024
c95f65e
publish will if PacketDecode exception
kickster97 Oct 23, 2024
d1dcbbd
move vhost logic from client into broker
kickster97 Oct 23, 2024
7ed81b3
Improve logging
spuun Oct 23, 2024
3cdf6aa
add retain_store specs for .retain and .each
kickster97 Oct 23, 2024
358523c
No need to prefix class, use namespace
spuun Oct 24, 2024
2c9ea27
Update src/lavinmq/mqtt/client.cr
kickster97 Oct 24, 2024
42a9f9f
remove unnessecary socket.close
kickster97 Oct 24, 2024
9e1ddb6
log warning instead of raise
kickster97 Oct 24, 2024
7dd27fa
fetch max_inflight_messages form config
kickster97 Oct 24, 2024
03e4e9d
Convert Publish to Message in exchange
spuun Oct 24, 2024
5d77985
Less aggressive logging
spuun Oct 24, 2024
2b1b38a
Suspend fiber while waiting for msg or consumer
spuun Oct 24, 2024
7f44080
add specs for handling connect packets with empty client_ids
kickster97 Oct 25, 2024
e487949
handle connect packets with empty client_id strings
kickster97 Oct 25, 2024
95ac073
fixup! Suspend fiber while waiting for msg or consumer
spuun Oct 29, 2024
f290486
Move Sessions to separate file
spuun Oct 30, 2024
ea924f5
Dont convert topic to routing key, and use topic all the way through
kickster97 Oct 30, 2024
96d5b54
prefix sessions with mqtt. and do not let amqp queues create queues t…
kickster97 Oct 30, 2024
c990e8e
move validation to queue_factory and return preconditioned fail for a…
kickster97 Oct 30, 2024
bc06320
prefix_validation wip
kickster97 Oct 31, 2024
db41e9d
delete old cherry-picked code, replaced with #818
kickster97 Oct 31, 2024
d86b677
mqtt exchange receives MQTT::Publish but publish AMQP::Message to que…
snichme Oct 31, 2024
d0f7879
remove obsolete spec
kickster97 Oct 31, 2024
09241ee
format
kickster97 Nov 1, 2024
08a1963
rebase in abstrace queue
kickster97 Nov 1, 2024
20343ca
adapt for queue abstraction
kickster97 Nov 1, 2024
06a68fa
repain broken amqp specs
kickster97 Nov 4, 2024
0a1f49d
rename prefix validator to namevalidator and move valid_entity_name i…
kickster97 Nov 4, 2024
28650de
remove unnessecary allocation in #NameValidator.valid_prefix
kickster97 Nov 4, 2024
024ff9c
cleanup ordering in connections js
kickster97 Nov 4, 2024
76ab2c0
remove sessions from vhost, redundant
kickster97 Nov 4, 2024
56829e9
use default random instead of secure for client_id
kickster97 Nov 4, 2024
dbc41b6
delete unreferences messages in retain store when building index
kickster97 Nov 4, 2024
8b063ff
move exchange into the mqtt namespace
kickster97 Nov 4, 2024
809626a
use mqtt namespace MqttBindingKey->BindingKey
kickster97 Nov 4, 2024
d6c9d7b
remove redundant return value
kickster97 Nov 4, 2024
813c081
update name validator
kickster97 Nov 6, 2024
df2cb43
move unacked_messagesapi logic to queue and overload method in session
kickster97 Nov 6, 2024
4b9be56
format
kickster97 Nov 6, 2024
f7ac14d
update logs for name validation failures
kickster97 Nov 6, 2024
19272da
don't have risk of overwriting retain store msg files
kickster97 Nov 7, 2024
7e01510
ensure to remove file
kickster97 Nov 7, 2024
84e548b
format
kickster97 Nov 7, 2024
e71ab36
replicate retain store, wip
kickster97 Nov 8, 2024
d3396cf
add mqtts config + listener
kickster97 Nov 11, 2024
6d5ad0c
format
kickster97 Nov 11, 2024
f0ef2d2
replication spec works correctly
kickster97 Nov 12, 2024
031aec7
add mqtt_proxy for clustering client
kickster97 Nov 13, 2024
e33d528
r+ needs file to exist before open
kickster97 Nov 13, 2024
16e48be
format
kickster97 Nov 13, 2024
1491120
fix ameba failures
kickster97 Nov 13, 2024
4c3480b
satisfy ameba for spec files
kickster97 Nov 13, 2024
068a174
rename specfile with suffix
kickster97 Nov 13, 2024
dd1f110
fix flaky wait_for
kickster97 Nov 13, 2024
ccde827
Use enum for protocol to get compile time validation
spuun Nov 13, 2024
aeeb4cb
Fix specs to use protocol enum
spuun Nov 13, 2024
cac1163
use short block notation
kickster97 Nov 13, 2024
6b6188f
fix mqtt exchange routing spec
snichme Nov 14, 2024
71561ac
remove comment
snichme Nov 14, 2024
9815c63
scope fix
snichme Nov 14, 2024
1de72a1
Multi-vhost support
spuun Nov 14, 2024
43bf65f
expand details tuple for consumer UI
kickster97 Nov 14, 2024
ef2c7f3
no need to convert routing key
kickster97 Nov 14, 2024
6f0eedd
format
kickster97 Nov 14, 2024
61a15e2
handle unexpected close from client
kickster97 Nov 14, 2024
0525c76
connection_at for mqtt connections
snichme Nov 14, 2024
131043d
deliver packet not msg from session (#843)
snichme Nov 14, 2024
bf18a56
truncate the previous content before you retain a message
kickster97 Nov 15, 2024
9410a13
safely overwrite retained messages
kickster97 Nov 15, 2024
7ac09ab
Cant use constant as key in NamedTuple
spuun Nov 15, 2024
cabf445
merge solutions for retain store
kickster97 Nov 15, 2024
b29bc61
general fixup after comments
kickster97 Nov 18, 2024
d19e4d1
format
kickster97 Nov 18, 2024
1b87d0e
set flaky spec to pending
kickster97 Nov 18, 2024
1af2f13
just a test
kickster97 Nov 18, 2024
9345b85
just a test
kickster97 Nov 18, 2024
f097cc9
tmp: debug clustering_spec
baelter Nov 19, 2024
69871d2
finalize clustering spec
kickster97 Nov 20, 2024
1dc8cb6
use instance var for index file name
kickster97 Nov 21, 2024
b6b1059
rescue argumenterror in deliver loop
kickster97 Dec 2, 2024
c67089d
format
kickster97 Dec 2, 2024
53854ce
Fix mqtt unix listener (and some logging)
spuun Dec 3, 2024
88a2f28
Prevent Channel::Closed error from being raised
spuun Dec 3, 2024
7eecf47
exception handling for mqtt default bindings
kickster97 Dec 10, 2024
d841337
only allow selected policies to be applied for mqtt session
kickster97 Dec 10, 2024
06162d9
handle qos 2 at build_packet
kickster97 Dec 10, 2024
56a71de
dont allow amqp queues or exchanges to bind to mqtt queues or exchanges
kickster97 Dec 11, 2024
95be31f
forbidden to bind AMQP excahnges to the MQTT Session
kickster97 Dec 12, 2024
d3d6cb1
format
kickster97 Dec 12, 2024
81ee5fd
clean up review comments
kickster97 Dec 13, 2024
2bbbcd9
initialize @broekrs in server instead of in connection_factory
kickster97 Dec 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions shard.lock
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ shards:
git: https://github.com/84codes/lz4.cr.git
version: 1.0.0+git.commit.96d714f7593c66ca7425872fd26c7b1286806d3d

mqtt-protocol:
git: https://github.com/84codes/mqtt-protocol.cr.git
version: 0.2.0+git.commit.3f82ee85d029e6d0505cbe261b108e156df4e598

systemd:
git: https://github.com/84codes/systemd.cr.git
version: 2.0.0
Expand Down
2 changes: 2 additions & 0 deletions shard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ dependencies:
github: 84codes/systemd.cr
lz4:
github: 84codes/lz4.cr
mqtt-protocol:
github: 84codes/mqtt-protocol.cr

development_dependencies:
ameba:
Expand Down
44 changes: 44 additions & 0 deletions spec/clustering_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ require "./spec_helper"
require "../src/lavinmq/clustering/client"
require "../src/lavinmq/clustering/controller"

alias IndexTree = LavinMQ::MQTT::TopicTree(String)

describe LavinMQ::Clustering::Client do
follower_data_dir = "/tmp/lavinmq-follower"

Expand Down Expand Up @@ -72,6 +74,48 @@ describe LavinMQ::Clustering::Client do
end
end

it "replicates and streams retained messages to followers" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)

spawn(replicator.listen(tcp_server), name: "repli server spec")
config = LavinMQ::Config.new.tap &.data_dir = follower_data_dir
repli = LavinMQ::Clustering::Client.new(config, 1, replicator.password, proxy: false)
done = Channel(Nil).new
spawn(name: "follow spec") do
repli.follow("localhost", tcp_server.local_address.port)
done.send nil
end
wait_for { replicator.followers.size == 1 }

retain_store = LavinMQ::MQTT::RetainStore.new("#{LavinMQ::Config.instance.data_dir}/retain_store", replicator)
wait_for { replicator.followers.first?.try &.lag_in_bytes == 0 }

props = LavinMQ::AMQP::Properties.new
msg1 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body1"))
msg2 = LavinMQ::Message.new(100, "test", "rk", props, 10, IO::Memory.new("body2"))
retain_store.retain("topic1", msg1.body_io, msg1.bodysize)
retain_store.retain("topic2", msg2.body_io, msg2.bodysize)

wait_for { replicator.followers.first?.try &.lag_in_bytes == 0 }
repli.close
done.receive

follower_retain_store = LavinMQ::MQTT::RetainStore.new("#{follower_data_dir}/retain_store", LavinMQ::Clustering::NoopServer.new)
a = Array(String).new(2)
b = Array(String).new(2)
follower_retain_store.each("#") do |topic, bytes|
a << topic
b << String.new(bytes)
end

a.sort!.should eq(["topic1", "topic2"])
b.sort!.should eq(["body1", "body2"])
follower_retain_store.retained_messages.should eq(2)
ensure
replicator.try &.close
end

it "can stream full file" do
replicator = LavinMQ::Clustering::Server.new(LavinMQ::Config.instance, LavinMQ::Etcd.new, 0)
tcp_server = TCPServer.new("localhost", 0)
Expand Down
39 changes: 39 additions & 0 deletions spec/message_routing_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -421,3 +421,42 @@ describe LavinMQ::Exchange do
end
end
end

describe LavinMQ::MQTT::Exchange do
it "should only allow Session to bind" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
q1 = LavinMQ::AMQP::Queue.new(vhost, "q1")
s1 = LavinMQ::MQTT::Session.new(vhost, "q1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
expect_raises(LavinMQ::Exchange::AccessRefused) do
x.bind(q1, "q1", LavinMQ::AMQP::Table.new)
end
end
end

it "publish messages to queues with it's own publish method" do
with_amqp_server do |s|
vhost = s.vhosts.create("x")
s1 = LavinMQ::MQTT::Session.new(vhost, "session 1")
index = LavinMQ::MQTT::TopicTree(String).new
store = LavinMQ::MQTT::RetainStore.new("tmp/retain_store", LavinMQ::Clustering::NoopServer.new, index)
x = LavinMQ::MQTT::Exchange.new(vhost, "mqtt.default", store)
x.bind(s1, "s1", LavinMQ::AMQP::Table.new)
pub_args = {
packet_id: 1u16,
payload: Bytes.new(0),
dup: false,
qos: 0u8,
retain: false,
topic: "s1",
}
msg = MQTT::Protocol::Publish.new(**pub_args)
x.publish(msg)
s1.message_count.should eq 1
end
end
end
Loading
Loading