Skip to content

Commit

Permalink
Fire blocked/unblocked events
Browse files Browse the repository at this point in the history
Events are fired when underlying amqplib detects RabbitMQ node is low on a resource (memory or disk)

* https://www.rabbitmq.com/connection-blocked.html
  • Loading branch information
Boris Petelj committed Mar 19, 2018
1 parent b1e056d commit bbd741e
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/AmqpConnectionManager.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,13 @@ class AmqpConnectionManager extends EventEmitter
.then (connection) =>
@_currentConnection = connection

#emit 'blocked' when RabbitMQ server decides to block the connection (resources running low)
connection.on 'blocked', (reason) =>
@emit 'blocked', {reason}

connection.on 'unblocked', () =>
@emit 'unblocked'

# Reconnect if the broker goes away.
connection.on 'error', (err) =>
Promise.resolve()
Expand Down
26 changes: 26 additions & 0 deletions test/AmqpConnectionManagerTest.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -221,3 +221,29 @@ describe 'AmqpConnectionManager', ->
expect(connectsSeen).to.equal 1

.then resolve, reject

it 'should detect connection block/unblock', ->
new Promise (resolve, reject) ->
amqp = new AmqpConnectionManager('amqp://localhost')
connectsSeen = 0
blockSeen = 0
unblockSeen = 0

amqp.on 'blocked', ({err}) ->
blockSeen++

amqp.on 'unblocked', () ->
unblockSeen++

amqp.once 'connect', ({connection, url}) ->
connectsSeen++
# Close the connection nicely
amqplib.simulateRemoteBlock()
amqplib.simulateRemoteUnblock()

Promise.resolve().then ->
expect(connectsSeen).to.equal 1
expect(blockSeen).to.equal 1
expect(unblockSeen).to.equal 1

.then resolve, reject
6 changes: 6 additions & 0 deletions test/fixtures.coffee
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ class exports.FakeAmqp
simulateRemoteClose: ->
@connection.emit 'close', new Error("Connection closed")

simulateRemoteBlock: ->
@connection.emit 'blocked', new Error("Connection blocked")

simulateRemoteUnblock: ->
@connection.emit 'unblocked'

reset: ->
@connection = null
@url = null
Expand Down

0 comments on commit bbd741e

Please sign in to comment.