From bbd741e6b82acaf8af533a933912213070f4bdf7 Mon Sep 17 00:00:00 2001 From: Boris Petelj Date: Mon, 19 Mar 2018 16:25:07 +0100 Subject: [PATCH] Fire blocked/unblocked events Events are fired when underlying amqplib detects RabbitMQ node is low on a resource (memory or disk) * https://www.rabbitmq.com/connection-blocked.html --- src/AmqpConnectionManager.coffee | 7 +++++++ test/AmqpConnectionManagerTest.coffee | 26 ++++++++++++++++++++++++++ test/fixtures.coffee | 6 ++++++ 3 files changed, 39 insertions(+) diff --git a/src/AmqpConnectionManager.coffee b/src/AmqpConnectionManager.coffee index dd97603..1521af3 100644 --- a/src/AmqpConnectionManager.coffee +++ b/src/AmqpConnectionManager.coffee @@ -102,6 +102,13 @@ class AmqpConnectionManager extends EventEmitter .then (connection) => @_currentConnection = connection + #emit 'blocked' when RabbitMQ server decides to block the connection (resources running low) + connection.on 'blocked', (reason) => + @emit 'blocked', {reason} + + connection.on 'unblocked', () => + @emit 'unblocked' + # Reconnect if the broker goes away. connection.on 'error', (err) => Promise.resolve() diff --git a/test/AmqpConnectionManagerTest.coffee b/test/AmqpConnectionManagerTest.coffee index eaddfd9..4869428 100644 --- a/test/AmqpConnectionManagerTest.coffee +++ b/test/AmqpConnectionManagerTest.coffee @@ -221,3 +221,29 @@ describe 'AmqpConnectionManager', -> expect(connectsSeen).to.equal 1 .then resolve, reject + + it 'should detect connection block/unblock', -> + new Promise (resolve, reject) -> + amqp = new AmqpConnectionManager('amqp://localhost') + connectsSeen = 0 + blockSeen = 0 + unblockSeen = 0 + + amqp.on 'blocked', ({err}) -> + blockSeen++ + + amqp.on 'unblocked', () -> + unblockSeen++ + + amqp.once 'connect', ({connection, url}) -> + connectsSeen++ + # Close the connection nicely + amqplib.simulateRemoteBlock() + amqplib.simulateRemoteUnblock() + + Promise.resolve().then -> + expect(connectsSeen).to.equal 1 + expect(blockSeen).to.equal 1 + expect(unblockSeen).to.equal 1 + + .then resolve, reject diff --git a/test/fixtures.coffee b/test/fixtures.coffee index 398ad8f..23bdd9f 100644 --- a/test/fixtures.coffee +++ b/test/fixtures.coffee @@ -13,6 +13,12 @@ class exports.FakeAmqp simulateRemoteClose: -> @connection.emit 'close', new Error("Connection closed") + simulateRemoteBlock: -> + @connection.emit 'blocked', new Error("Connection blocked") + + simulateRemoteUnblock: -> + @connection.emit 'unblocked' + reset: -> @connection = null @url = null