Skip to content

Commit

Permalink
feat: Set 'this' to be the channel wrapper in the setup function.
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Walton committed Sep 25, 2018
1 parent 53bcf6d commit 551200f
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 45 deletions.
94 changes: 52 additions & 42 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,38 @@
[![devDependency Status](https://david-dm.org/benbria/node-amqp-connection-manager/dev-status.svg)](https://david-dm.org/benbria/node-amqp-connection-manager#info=devDependencies)
[![peerDependency Status](https://david-dm.org/benbria/node-amqp-connection-manager/peer-status.svg)](https://david-dm.org/benbria/node-amqp-connection-manager#info=peerDependencies)


Connection management for amqplib.

amqp-connection-manager
=======================
# amqp-connection-manager

Features:
---------
## Features

* Automatically reconnect when your amqplib broker dies in a fire.
* Round-robin connections between multiple brokers in a cluster.
* If messages are sent while the broker is unavailable, queues messages in memory until we reconnect.
* Supports both promises and callbacks (using [promise-breaker](https://github.com/jwalton/node-promise-breaker))
* Very un-opinionated library - a thin wrapper around amqplib.

Installation:
-------------
## Installation

npm install --save amqplib amqp-connection-manager

Basics:
-------
## Basics

The basic idea here is that, usually, when you create a new channel, you do some setup work at the beginning (like
asserting that various queues or exchanges exist, or binding to queues), and then you send and receive messages and
you never touch that stuff again.
The basic idea here is that, usually, when you create a new channel, you do some
setup work at the beginning (like asserting that various queues or exchanges
exist, or binding to queues), and then you send and receive messages and you
never touch that stuff again.

amqp-connection-manager will reconnect to a new broker whenever the broker it is currently connected to dies. When you
ask amqp-connection-manager for a channel, you specify one or more `setup` functions to run; the setup functions will
be run every time amqp-connection-manager reconnects, to make sure your channel and broker are in a sane state.
amqp-connection-manager will reconnect to a new broker whenever the broker it is
currently connected to dies. When you ask amqp-connection-manager for a
channel, you specify one or more `setup` functions to run; the setup functions
will be run every time amqp-connection-manager reconnects, to make sure your
channel and broker are in a sane state.

Before we get into an example, note this example is written using Promises, however much like amqplib, any
function which returns a Promise will also accept a callback as an optional parameter.
Before we get into an example, note this example is written using Promises,
however much like amqplib, any function which returns a Promise will also accept
a callback as an optional parameter.

Here's the example:

Expand All @@ -49,12 +48,13 @@ var amqp = require('amqp-connection-manager');
// Create a new connection manager
var connection = amqp.connect(['amqp://localhost']);

// Ask the connection manager for a ChannelWrapper. Specify a setup function to run every time we reconnect
// to the broker.
// Ask the connection manager for a ChannelWrapper. Specify a setup function to
// run every time we reconnect to the broker.
var channelWrapper = connection.createChannel({
json: true,
setup: function(channel) {
// `channel` here is a regular amqplib `ConfirmChannel`.
// Note that `this` here is the channelWrapper instance.
return channel.assertQueue('rxQueueName', {durable: true}),
}
});
Expand All @@ -64,15 +64,16 @@ var channelWrapper = connection.createChannel({
// when the message is actually sent (or not sent.)
channelWrapper.sendToQueue('rxQueueName', {hello: 'world'})
.then(function() {
return console.log("Message was sent! Hooray!");
return console.log("Message was sent! Hooray!");
}).catch(function(err) {
return console.log("Message was rejected... Boo!");
return console.log("Message was rejected... Boo!");
});
```

Sometimes it's handy to modify a channel at run time. For example, suppose you have a channel that's listening to
one kind of message, and you decide you now also want to listen to some other kind of message. This can be done
by adding a new setup function to an existing ChannelWrapper:
Sometimes it's handy to modify a channel at run time. For example, suppose you
have a channel that's listening to one kind of message, and you decide you now
also want to listen to some other kind of message. This can be done by adding a
new setup function to an existing ChannelWrapper:

```js
channelWrapper.addSetup(function(channel) {
Expand All @@ -84,21 +85,24 @@ channelWrapper.addSetup(function(channel) {
});
```

`addSetup()` returns a Promise which resolves when the setup function is finished (or immediately, if the underlying
connection is not currently connected to a broker.) There is also a `removeSetup(setup, teardown)` which will run
`teardown(channel)` if the channel is currently connected to a broker (and will not run `teardown` at all otherwise.)
Note that `setup` and `teardown` *must* either accept a callback or return a Promise.
`addSetup()` returns a Promise which resolves when the setup function is
finished (or immediately, if the underlying connection is not currently
connected to a broker.) There is also a `removeSetup(setup, teardown)` which
will run `teardown(channel)` if the channel is currently connected to a broker
(and will not run `teardown` at all otherwise.) Note that `setup` and `teardown`
*must* either accept a callback or return a Promise.

See a complete example in the [examples](./examples) folder.

API:
----
## API

### connect(urls, options)

Creates a new AmqpConnectionManager, which will connect to one of the URLs provided in `urls`. If a broker is
unreachable or dies, then AmqpConnectionManager will try the next available broker, round-robin.

Options:

* `options.heartbeatIntervalInSeconds` - Interval to send heartbeats to broker. Defaults to 5 seconds.
* `options.reconnectTimeInSeconds` - The time to wait before trying to reconnect. If not specified,
defaults to `heartbeatIntervalInSeconds`.
Expand All @@ -109,37 +113,43 @@ Options:
* `options.connectionOptions` is passed as options to the amqplib connect method.

### AmqpConnectionManager events

* `connect({connection, url})` - Emitted whenever we successfully connect to a broker.
* `disconnect({err})` - Emitted whenever we disconnect from a broker.


### AmqpConnectionManager#createChannel(options)

Create a new ChannelWrapper. This is a proxy for the actual channel (which may or may not exist at any moment,
depending on whether or not we are currently connected.)

Options:

* `options.name` - Name for this channel. Used for debugging.
* `options.setup(channel, [cb])` - A function to call whenever we reconnect to the broker (and therefore create a new
underlying channel.) This function should either accept a callback, or return a Promise. See `addSetup` below.
* `options.setup(channel, [cb])` - A function to call whenever we reconnect to the
broker (and therefore create a new underlying channel.) This function should
either accept a callback, or return a Promise. See `addSetup` below.
Note that `this` inside the setup function will the returned ChannelWrapper.
The ChannelWrapper has a special `context` member you can use to store
arbitrary data in.
* `options.json` if true, then ChannelWrapper assumes all messages passed to `publish()` and `sendToQueue()`
are plain JSON objects. These will be encoded automatically before being sent.


### AmqpConnectionManager#isConnected()
Returns true if the AmqpConnectionManager is connected to a broker, false otherwise.

Returns true if the AmqpConnectionManager is connected to a broker, false otherwise.

### AmqpConnectionManager#close()
Close this AmqpConnectionManager and free all associated resources.

Close this AmqpConnectionManager and free all associated resources.

### ChannelWrapper events

* `connect` - emitted every time this channel connects or reconnects.
* `error(err, {name})` - emitted if an error occurs setting up the channel.
* `close` - emitted when this channel closes via a call to `close()`


### ChannelWrapper#addSetup(setup)

Adds a new 'setup handler'.

`setup(channel, [cb])` is a function to call when a new underlying channel is created - handy for asserting
Expand All @@ -155,27 +165,27 @@ reconnects, even if it throws an error.)
Setup functions should, ideally, not throw errors, but if they do then the ChannelWrapper will emit an 'error'
event.


### ChannelWrapper#removeSetup(setup, teardown)

Removes a setup handler. If the channel is currently connected, will call `teardown(channel)`, passing in the
underlying amqplib ConfirmChannel. `teardown` should either take a callback or return a Promise.


### ChannelWrapper#publish and ChannelWrapper#sendToQueue

These work exactly like their counterparts in amqplib's Channel, except that they return a Promise (or accept a
callback) which resolves when the message is confirmed to have been delivered to the broker. The promise rejects if
either the broker refuses the message, or if `close()` is called on the ChannelWrapper before the message can be
delivered.


### ChannelWrapper#ack and ChannelWrapper#nack

These are just aliases for calling `ack()` and `nack()` on the underlying channel. They do nothing if the underlying
channel is not connected.


### ChannelWrapper#queueLength()
Returns a count of messages currently waiting to be sent to the underlying channel.

Returns a count of messages currently waiting to be sent to the underlying channel.

### ChannelWrapper#close()

Close a channel, clean up resources associated with it.
8 changes: 5 additions & 3 deletions src/ChannelWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export default class ChannelWrapper extends EventEmitter {
.then(() => {
this._setups.push(setup);
if(this._channel) {
return pb.call(setup, null, this._channel);
return pb.call(setup, this, this._channel);
} else {
return undefined;
}
Expand All @@ -65,7 +65,7 @@ export default class ChannelWrapper extends EventEmitter {

return (this._settingUp || Promise.resolve())
.then(() => this._channel
? pb.call(teardown, null, this._channel)
? pb.call(teardown, this, this._channel)
: undefined
);
});
Expand Down Expand Up @@ -156,6 +156,8 @@ export default class ChannelWrapper extends EventEmitter {
this._connectionManager = connectionManager;
this.name = options.name;

this.context = {};

this._json = ('json' in options) ? options.json : false;

// Place to store queued messages.
Expand Down Expand Up @@ -206,7 +208,7 @@ export default class ChannelWrapper extends EventEmitter {
this._settingUp = Promise.all(
this._setups.map(setupFn =>
// TODO: Use a timeout here to guard against setupFns that never resolve?
pb.call(setupFn, null, channel)
pb.call(setupFn, this, channel)
.catch(err => {
if(this._channel) {
this.emit('error', err, { name: this.name });
Expand Down
16 changes: 16 additions & 0 deletions test/ChannelWrapperTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,22 @@ describe('ChannelWrapper', function() {
expect(setup2.callCount).to.equal(2);
});

it('should set `this` correctly in a setup function', async function() {
let whatIsThis;

const channelWrapper = new ChannelWrapper(connectionManager, {
setup() {
whatIsThis = this;
}
});

connectionManager.simulateConnect();
await channelWrapper.waitForConnect();

expect(whatIsThis).to.equal(channelWrapper);
});


it('should emit an error if a setup function throws', async function() {
const setup1 = sinon.spy(() => Promise.resolve());
const setup2 = sinon.spy(() => Promise.reject(new Error('Boom!')));
Expand Down

0 comments on commit 551200f

Please sign in to comment.