-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a queue to wait for Puback packets rather than polling #120
Conversation
This commit changes the `publish` method to use a queue to wait for Puback packets rather than polling a hash. Every time the read loop gets data or a timeout from `IO.select`, it will send a message to everyone waiting for a `Puback` packet. If the we're within the deadline, then the loop executes again, if we got a packet, we'll return the packet, and if we're outside the deadline, a `-1` is returned. This upside is that this patch speeds up the publish method by over 100x. Here is the benchmark: ```ruby require "securerandom" require "mqtt" require "benchmark/ips" require "stackprof" client = MQTT::Client.new(username: 'testuser', password: 'testpasswd', client_id: "client_#{SecureRandom.hex(10)}", host: '127.0.0.1') client.connect Benchmark.ips do |x| x.report("send message") { i = rand(1..10) topic = "to/timebox#{i}/cameras" msg = "message #{Time.now} for timebox#{i}" client.publish(topic, msg, true, 1) } end ``` Before this patch: ``` $ ruby -I lib thing.rb Warming up -------------------------------------- send message 8.000 i/100ms Calculating ------------------------------------- send message 85.042 (± 4.7%) i/s - 432.000 in 5.089261s ``` After this patch: ``` $ ruby -I lib thing.rb Warming up -------------------------------------- send message 915.000 i/100ms Calculating ------------------------------------- send message 9.453k (± 4.5%) i/s - 47.580k in 5.043716s ``` The downside is that the timeout isn't exact. Since `IO.select` times out every `0.5` seconds (according to the `SELECT_TIMEOUT` constant), the deadline in the `publish` method could be missed by that amount of time. Refs njh#115
e097095
to
aa25879
Compare
Awesome, that is great, thanks! I hated that there was a I will get those old/broken versions of ruby removed from Travis, then get this PR merged. |
@njh 🙇🏻♀️ thank you for making this library! |
While I was thinking about this, I realized I've kind of broken the timeout code. We need to add this line: diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb
index 707f932..e6e9884 100644
--- a/lib/mqtt/client.rb
+++ b/lib/mqtt/client.rb
@@ -459,6 +459,7 @@ module MQTT
def receive_packet
# Poll socket - is there data waiting?
result = IO.select([@socket], [], [], SELECT_TIMEOUT)
+ handle_timeouts
unless result.nil?
# Yes - read in the packet
packet = MQTT::Packet.read(@socket) I'm trying to figure out how to write a test for it though. It doesn't seem like there's a way to get the read thread to run in the test framework. |
@njh I added a test for the timeout, but the test is extremely hacky :( It does ensure we hit our timeouts though. |
Removed older rubies, no-longer in rvm
Sorry, I completely forgot to merge this. Done now! |
This commit changes the
publish
method to use a queue to wait forPuback packets rather than polling a hash. Every time the read loop
gets data or a timeout from
IO.select
, it will send a message toeveryone waiting for a
Puback
packet. If the we're within thedeadline, then the loop executes again, if we got a packet, we'll return
the packet, and if we're outside the deadline, a
-1
is returned.This upside is that this patch speeds up the publish method by over
100x. Here is the benchmark:
Before this patch:
After this patch:
The downside is that the timeout isn't exact. Since
IO.select
timesout every
0.5
seconds (according to theSELECT_TIMEOUT
constant),the deadline in the
publish
method could be missed by that amount oftime.
Refs #115