Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use a queue to wait for Puback packets rather than polling #120

Merged
merged 5 commits into from
Mar 9, 2020

Conversation

tenderlove
Copy link
Contributor

This commit changes the publish method to use a queue to wait for
Puback packets rather than polling a hash. Every time the read loop
gets data or a timeout from IO.select, it will send a message to
everyone waiting for a Puback packet. If the we're within the
deadline, then the loop executes again, if we got a packet, we'll return
the packet, and if we're outside the deadline, a -1 is returned.

This upside is that this patch speeds up the publish method by over
100x. Here is the benchmark:

require "securerandom"
require "mqtt"
require "benchmark/ips"
require "stackprof"

client = MQTT::Client.new(username: 'testuser', password: 'testpasswd', client_id: "client_#{SecureRandom.hex(10)}", host: '127.0.0.1')
client.connect

Benchmark.ips do |x|
  x.report("send message") {
    i = rand(1..10)
    topic = "to/timebox#{i}/cameras"
    msg = "message #{Time.now} for timebox#{i}"
    client.publish(topic, msg, true, 1)
  }
end

Before this patch:

$ ruby -I lib thing.rb
Warming up --------------------------------------
        send message     8.000  i/100ms
Calculating -------------------------------------
        send message     85.042  (± 4.7%) i/s -    432.000  in   5.089261s

After this patch:

$ ruby -I lib thing.rb
Warming up --------------------------------------
        send message   915.000  i/100ms
Calculating -------------------------------------
        send message      9.453k (± 4.5%) i/s -     47.580k in   5.043716s

The downside is that the timeout isn't exact. Since IO.select times
out every 0.5 seconds (according to the SELECT_TIMEOUT constant),
the deadline in the publish method could be missed by that amount of
time.

Refs #115

This commit changes the `publish` method to use a queue to wait for
Puback packets rather than polling a hash.  Every time the read loop
gets data or a timeout from `IO.select`, it will send a message to
everyone waiting for a `Puback` packet.  If the we're within the
deadline, then the loop executes again, if we got a packet, we'll return
the packet, and if we're outside the deadline, a `-1` is returned.

This upside is that this patch speeds up the publish method by over
100x.  Here is the benchmark:

```ruby
require "securerandom"
require "mqtt"
require "benchmark/ips"
require "stackprof"

client = MQTT::Client.new(username: 'testuser', password: 'testpasswd', client_id: "client_#{SecureRandom.hex(10)}", host: '127.0.0.1')
client.connect

Benchmark.ips do |x|
  x.report("send message") {
    i = rand(1..10)
    topic = "to/timebox#{i}/cameras"
    msg = "message #{Time.now} for timebox#{i}"
    client.publish(topic, msg, true, 1)
  }
end
```

Before this patch:

```
$ ruby -I lib thing.rb
Warming up --------------------------------------
        send message     8.000  i/100ms
Calculating -------------------------------------
        send message     85.042  (± 4.7%) i/s -    432.000  in   5.089261s
```

After this patch:

```
$ ruby -I lib thing.rb
Warming up --------------------------------------
        send message   915.000  i/100ms
Calculating -------------------------------------
        send message      9.453k (± 4.5%) i/s -     47.580k in   5.043716s
```

The downside is that the timeout isn't exact.  Since `IO.select` times
out every `0.5` seconds (according to the `SELECT_TIMEOUT` constant),
the deadline in the `publish` method could be missed by that amount of
time.

Refs njh#115
@njh
Copy link
Owner

njh commented Jan 27, 2020

Awesome, that is great, thanks!

I hated that there was a sleep in the code but had forgotten about it when those performance issues were brought up.

I will get those old/broken versions of ruby removed from Travis, then get this PR merged.

@tenderlove
Copy link
Contributor Author

@njh 🙇🏻‍♀️ thank you for making this library!

@tenderlove
Copy link
Contributor Author

While I was thinking about this, I realized I've kind of broken the timeout code. We need to add this line:

diff --git a/lib/mqtt/client.rb b/lib/mqtt/client.rb
index 707f932..e6e9884 100644
--- a/lib/mqtt/client.rb
+++ b/lib/mqtt/client.rb
@@ -459,6 +459,7 @@ module MQTT
     def receive_packet
       # Poll socket - is there data waiting?
       result = IO.select([@socket], [], [], SELECT_TIMEOUT)
+      handle_timeouts
       unless result.nil?
         # Yes - read in the packet
         packet = MQTT::Packet.read(@socket)

I'm trying to figure out how to write a test for it though. It doesn't seem like there's a way to get the read thread to run in the test framework.

@tenderlove
Copy link
Contributor Author

@njh I added a test for the timeout, but the test is extremely hacky :( It does ensure we hit our timeouts though.

njh and others added 2 commits January 28, 2020 16:13
Removed older rubies, no-longer in rvm
@njh njh merged commit bd163ed into njh:master Mar 9, 2020
@njh
Copy link
Owner

njh commented Mar 9, 2020

Sorry, I completely forgot to merge this. Done now!
Thank you very much for this contribution.

@tenderlove tenderlove deleted the speed-up-publish branch March 8, 2021 23:16
@njh njh mentioned this pull request Feb 16, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants