diff --git a/README.md b/README.md index 7fa62d9..b864068 100644 --- a/README.md +++ b/README.md @@ -7,14 +7,11 @@ [![devDependency Status](https://david-dm.org/benbria/node-amqp-connection-manager/dev-status.svg)](https://david-dm.org/benbria/node-amqp-connection-manager#info=devDependencies) [![peerDependency Status](https://david-dm.org/benbria/node-amqp-connection-manager/peer-status.svg)](https://david-dm.org/benbria/node-amqp-connection-manager#info=peerDependencies) - Connection management for amqplib. -amqp-connection-manager -======================= +# amqp-connection-manager -Features: ---------- +## Features * Automatically reconnect when your amqplib broker dies in a fire. * Round-robin connections between multiple brokers in a cluster. @@ -22,24 +19,26 @@ Features: * Supports both promises and callbacks (using [promise-breaker](https://github.com/jwalton/node-promise-breaker)) * Very un-opinionated library - a thin wrapper around amqplib. -Installation: -------------- +## Installation npm install --save amqplib amqp-connection-manager -Basics: -------- +## Basics -The basic idea here is that, usually, when you create a new channel, you do some setup work at the beginning (like -asserting that various queues or exchanges exist, or binding to queues), and then you send and receive messages and -you never touch that stuff again. +The basic idea here is that, usually, when you create a new channel, you do some +setup work at the beginning (like asserting that various queues or exchanges +exist, or binding to queues), and then you send and receive messages and you +never touch that stuff again. -amqp-connection-manager will reconnect to a new broker whenever the broker it is currently connected to dies. When you -ask amqp-connection-manager for a channel, you specify one or more `setup` functions to run; the setup functions will -be run every time amqp-connection-manager reconnects, to make sure your channel and broker are in a sane state. +amqp-connection-manager will reconnect to a new broker whenever the broker it is +currently connected to dies. When you ask amqp-connection-manager for a +channel, you specify one or more `setup` functions to run; the setup functions +will be run every time amqp-connection-manager reconnects, to make sure your +channel and broker are in a sane state. -Before we get into an example, note this example is written using Promises, however much like amqplib, any -function which returns a Promise will also accept a callback as an optional parameter. +Before we get into an example, note this example is written using Promises, +however much like amqplib, any function which returns a Promise will also accept +a callback as an optional parameter. Here's the example: @@ -49,12 +48,13 @@ var amqp = require('amqp-connection-manager'); // Create a new connection manager var connection = amqp.connect(['amqp://localhost']); -// Ask the connection manager for a ChannelWrapper. Specify a setup function to run every time we reconnect -// to the broker. +// Ask the connection manager for a ChannelWrapper. Specify a setup function to +// run every time we reconnect to the broker. var channelWrapper = connection.createChannel({ json: true, setup: function(channel) { // `channel` here is a regular amqplib `ConfirmChannel`. + // Note that `this` here is the channelWrapper instance. return channel.assertQueue('rxQueueName', {durable: true}), } }); @@ -64,15 +64,16 @@ var channelWrapper = connection.createChannel({ // when the message is actually sent (or not sent.) channelWrapper.sendToQueue('rxQueueName', {hello: 'world'}) .then(function() { - return console.log("Message was sent! Hooray!"); + return console.log("Message was sent! Hooray!"); }).catch(function(err) { - return console.log("Message was rejected... Boo!"); + return console.log("Message was rejected... Boo!"); }); ``` -Sometimes it's handy to modify a channel at run time. For example, suppose you have a channel that's listening to -one kind of message, and you decide you now also want to listen to some other kind of message. This can be done -by adding a new setup function to an existing ChannelWrapper: +Sometimes it's handy to modify a channel at run time. For example, suppose you +have a channel that's listening to one kind of message, and you decide you now +also want to listen to some other kind of message. This can be done by adding a +new setup function to an existing ChannelWrapper: ```js channelWrapper.addSetup(function(channel) { @@ -84,21 +85,24 @@ channelWrapper.addSetup(function(channel) { }); ``` -`addSetup()` returns a Promise which resolves when the setup function is finished (or immediately, if the underlying -connection is not currently connected to a broker.) There is also a `removeSetup(setup, teardown)` which will run -`teardown(channel)` if the channel is currently connected to a broker (and will not run `teardown` at all otherwise.) -Note that `setup` and `teardown` *must* either accept a callback or return a Promise. +`addSetup()` returns a Promise which resolves when the setup function is +finished (or immediately, if the underlying connection is not currently +connected to a broker.) There is also a `removeSetup(setup, teardown)` which +will run `teardown(channel)` if the channel is currently connected to a broker +(and will not run `teardown` at all otherwise.) Note that `setup` and `teardown` +*must* either accept a callback or return a Promise. See a complete example in the [examples](./examples) folder. -API: ----- +## API ### connect(urls, options) + Creates a new AmqpConnectionManager, which will connect to one of the URLs provided in `urls`. If a broker is unreachable or dies, then AmqpConnectionManager will try the next available broker, round-robin. Options: + * `options.heartbeatIntervalInSeconds` - Interval to send heartbeats to broker. Defaults to 5 seconds. * `options.reconnectTimeInSeconds` - The time to wait before trying to reconnect. If not specified, defaults to `heartbeatIntervalInSeconds`. @@ -109,37 +113,43 @@ Options: * `options.connectionOptions` is passed as options to the amqplib connect method. ### AmqpConnectionManager events + * `connect({connection, url})` - Emitted whenever we successfully connect to a broker. * `disconnect({err})` - Emitted whenever we disconnect from a broker. - ### AmqpConnectionManager#createChannel(options) + Create a new ChannelWrapper. This is a proxy for the actual channel (which may or may not exist at any moment, depending on whether or not we are currently connected.) Options: + * `options.name` - Name for this channel. Used for debugging. -* `options.setup(channel, [cb])` - A function to call whenever we reconnect to the broker (and therefore create a new - underlying channel.) This function should either accept a callback, or return a Promise. See `addSetup` below. +* `options.setup(channel, [cb])` - A function to call whenever we reconnect to the + broker (and therefore create a new underlying channel.) This function should + either accept a callback, or return a Promise. See `addSetup` below. + Note that `this` inside the setup function will the returned ChannelWrapper. + The ChannelWrapper has a special `context` member you can use to store + arbitrary data in. * `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()` are plain JSON objects. These will be encoded automatically before being sent. - ### AmqpConnectionManager#isConnected() -Returns true if the AmqpConnectionManager is connected to a broker, false otherwise. +Returns true if the AmqpConnectionManager is connected to a broker, false otherwise. ### AmqpConnectionManager#close() -Close this AmqpConnectionManager and free all associated resources. +Close this AmqpConnectionManager and free all associated resources. ### ChannelWrapper events + * `connect` - emitted every time this channel connects or reconnects. * `error(err, {name})` - emitted if an error occurs setting up the channel. * `close` - emitted when this channel closes via a call to `close()` - ### ChannelWrapper#addSetup(setup) + Adds a new 'setup handler'. `setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting @@ -155,27 +165,27 @@ reconnects, even if it throws an error.) Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error' event. - ### ChannelWrapper#removeSetup(setup, teardown) + Removes a setup handler. If the channel is currently connected, will call `teardown(channel)`, passing in the underlying amqplib ConfirmChannel. `teardown` should either take a callback or return a Promise. - ### ChannelWrapper#publish and ChannelWrapper#sendToQueue + These work exactly like their counterparts in amqplib's Channel, except that they return a Promise (or accept a callback) which resolves when the message is confirmed to have been delivered to the broker. The promise rejects if either the broker refuses the message, or if `close()` is called on the ChannelWrapper before the message can be delivered. - ### ChannelWrapper#ack and ChannelWrapper#nack + These are just aliases for calling `ack()` and `nack()` on the underlying channel. They do nothing if the underlying channel is not connected. - ### ChannelWrapper#queueLength() -Returns a count of messages currently waiting to be sent to the underlying channel. +Returns a count of messages currently waiting to be sent to the underlying channel. ### ChannelWrapper#close() + Close a channel, clean up resources associated with it. diff --git a/src/ChannelWrapper.js b/src/ChannelWrapper.js index 171ca51..8ed19b4 100644 --- a/src/ChannelWrapper.js +++ b/src/ChannelWrapper.js @@ -40,7 +40,7 @@ export default class ChannelWrapper extends EventEmitter { .then(() => { this._setups.push(setup); if(this._channel) { - return pb.call(setup, null, this._channel); + return pb.call(setup, this, this._channel); } else { return undefined; } @@ -65,7 +65,7 @@ export default class ChannelWrapper extends EventEmitter { return (this._settingUp || Promise.resolve()) .then(() => this._channel - ? pb.call(teardown, null, this._channel) + ? pb.call(teardown, this, this._channel) : undefined ); }); @@ -156,6 +156,8 @@ export default class ChannelWrapper extends EventEmitter { this._connectionManager = connectionManager; this.name = options.name; + this.context = {}; + this._json = ('json' in options) ? options.json : false; // Place to store queued messages. @@ -206,7 +208,7 @@ export default class ChannelWrapper extends EventEmitter { this._settingUp = Promise.all( this._setups.map(setupFn => // TODO: Use a timeout here to guard against setupFns that never resolve? - pb.call(setupFn, null, channel) + pb.call(setupFn, this, channel) .catch(err => { if(this._channel) { this.emit('error', err, { name: this.name }); diff --git a/test/ChannelWrapperTest.js b/test/ChannelWrapperTest.js index 58cdaa6..eb48b8a 100644 --- a/test/ChannelWrapperTest.js +++ b/test/ChannelWrapperTest.js @@ -62,6 +62,22 @@ describe('ChannelWrapper', function() { expect(setup2.callCount).to.equal(2); }); + it('should set `this` correctly in a setup function', async function() { + let whatIsThis; + + const channelWrapper = new ChannelWrapper(connectionManager, { + setup() { + whatIsThis = this; + } + }); + + connectionManager.simulateConnect(); + await channelWrapper.waitForConnect(); + + expect(whatIsThis).to.equal(channelWrapper); + }); + + it('should emit an error if a setup function throws', async function() { const setup1 = sinon.spy(() => Promise.resolve()); const setup2 = sinon.spy(() => Promise.reject(new Error('Boom!')));