Skip to content
This repository has been archived by the owner on Sep 19, 2024. It is now read-only.

Added batching mode #61

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 110 additions & 20 deletions lib/statsd.js
Original file line number Diff line number Diff line change
@@ -1,33 +1,49 @@
var dgram = require('dgram'),
dns = require('dns');

/**
* @const
*/
var DEFAULT_MAX_BATCH_DELAY = 1000;

/**
* @const
*/
var DEFAULT_MAX_BATCH_LENGTH = 1500;

/**
* The UDP Client for StatsD
* @param options
* @option host {String} The host to connect to default: localhost
* @option port {String|Integer} The port to connect to default: 8125
* @option prefix {String} An optional prefix to assign to each stat name sent
* @option suffix {String} An optional suffix to assign to each stat name sent
* @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace
* @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once
* @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent.
* @option global_tags {Array=} Optional tags that will be added to every metric
* @option host {String} The host to connect to default: localhost
* @option port {String|Integer} The port to connect to default: 8125
* @option prefix {String} An optional prefix to assign to each stat name sent
* @option suffix {String} An optional suffix to assign to each stat name sent
* @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace
* @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once
* @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent.
* @option global_tags {Array=} Optional tags that will be added to every metric
* @option batch {boolean} Whether to send metrics in batches
* @option maxBatchDelay {Integer} Maximum period for batch accumulation in millisecond
* @option maxBatchLength {Integer} Maximum length of a packet, should not be more then the network MTU
* @constructor
*/
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) {
var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch, maxBatchDelay, maxBatchLength) {
var options = host || {},
self = this;

if(arguments.length > 1 || typeof(host) === 'string'){
options = {
host : host,
port : port,
prefix : prefix,
suffix : suffix,
globalize : globalize,
cacheDns : cacheDns,
mock : mock === true,
global_tags : global_tags
host : host,
port : port,
prefix : prefix,
suffix : suffix,
globalize : globalize,
cacheDns : cacheDns,
mock : mock === true,
global_tags : global_tags,
batch : batch,
maxBatchDelay : maxBatchDelay,
maxBatchLength : maxBatchLength
};
}

Expand All @@ -38,6 +54,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
this.socket = dgram.createSocket('udp4');
this.mock = options.mock;
this.global_tags = options.global_tags || [];
this.batch = options.batch;
this.maxBatchDelay = options.maxBatchDelay || DEFAULT_MAX_BATCH_DELAY;
this.maxBatchLength = options.maxBatchLength || DEFAULT_MAX_BATCH_LENGTH;

if(options.cacheDns === true){
dns.lookup(options.host, function(err, address, family){
Expand All @@ -50,6 +69,56 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl
if(options.globalize){
global.statsd = this;
}

if (options.batch) {
this.pendingMessages = [];
this.batchLength = 0;
}
};

Client.prototype._sendBatch = function(batchCallback) {
if (this.pendingMessages.length) {
var batchMessage = this.pendingMessages.map(function(msg) { return msg.message; }).join('\n');
var callbacks = this.pendingMessages.map(function(msg) { return msg.callback; });
this._send(batchMessage, function(err) {
if (typeof batchCallback === 'function') {
batchCallback(err);
}
callbacks.forEach(function(callback) {
if (typeof callback === 'function') {
callback(err);
}
});
});
this.pendingMessages = [];
this.batchLength = 0;
} else if (typeof batchCallback === 'function') {
batchCallback();
}
};

Client.prototype._addToBatch = function(message, callback) {
if (!!message) {
if (this.batchLength + message.length > this.maxBatchLength) {
this._sendBatch();
// Reset the timer as we've just sent the batches
if (this.sendTimeout) {
clearTimeout(this.sendTimeout);
this.sendTimeout = null;
}
} else if (!this.sendTimeout) {
// If we are not sending a batch now and didn't have a send timeout yet
// we need to init it to ensure this message will be sent not more that
// after maxBatchDelay
this.sendTimeout = setTimeout(this._sendBatch.bind(this), this.maxBatchDelay);
}

this.pendingMessages.push({
message: message,
callback: callback
});
this.batchLength += message.length;
}
};

/**
Expand Down Expand Up @@ -215,6 +284,14 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback)
message += '|#' + merged_tags.join(',');
}

if (this.batch) {
this._addToBatch(message, callback);
} else {
this._send(message, callback);
}
};

Client.prototype._send = function(message, callback) {
// Only send this stat if we're not a mock Client.
if(!this.mock) {
buf = new Buffer(message);
Expand All @@ -229,9 +306,22 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback)
/**
* Close the underlying socket and stop listening for data on it.
*/
Client.prototype.close = function(){
this.socket.close();
}
Client.prototype.close = function() {
var self = this;
if (self.batch) {
// Clear the send timeout
if (self.sendTimeout) {
clearTimeout(this.sendTimeout);
self.sendTimeout = null;
}
// And send pending messages immediately
self._sendBatch(function() {
self.socket.close();
});
} else {
self.socket.close();
}
};

exports = module.exports = Client;
exports.StatsD = Client;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{ "name" : "node-statsd"
, "description" : "node client for Etsy'd StatsD server"
, "version" : "0.1.1"
, "version" : "0.1.2"
, "author" : "Steve Ivy"
, "contributors": [ "Russ Bradberry <[email protected]>" ]
, "repository" :
Expand Down
71 changes: 71 additions & 0 deletions test/test_statsd.js
Original file line number Diff line number Diff line change
Expand Up @@ -680,4 +680,75 @@ describe('StatsD', function(){
});
});

describe('#batching', function() {
it('should send batches not longer than the max length', function(finished) {
udpTest(function(message, server) {
assert.equal(message, 'a:1|c\na:1|c');
server.close();
finished();
}, function(server) {
var address = server.address(),
statsd = new StatsD({
host: address.address,
port: address.port,
batch: true,
maxBatchLength: 10
});
for (var i = 0; i < 5; i++) {
statsd.increment('a', 1);
}
});
});
it('should send a batch after one second', function(finished) {
udpTest(function(message, server) {
assert.equal(message, 'a:1|c');
server.close();
finished();
}, function(server) {
var address = server.address(),
statsd = new StatsD({
host: address.address,
port: address.port,
batch: true
});
statsd.increment('a', 1);
});
});
it('should send batches of different messages', function(finished) {
udpTest(function(message, server) {
assert.equal(message, 'a:1|c\nb:-1|c\nc:1|g\nd:2|h\ne:3|ms\nf:5|s');
server.close();
finished();
}, function(server) {
var address = server.address(),
statsd = new StatsD({
host: address.address,
port: address.port,
batch: true,
});
statsd.increment('a', 1);
statsd.decrement('b', 1);
statsd.gauge('c', 1);
statsd.histogram('d', 2);
statsd.timing('e', 3);
statsd.set('f', 5);
});
});
it('should send pending messages after client is closed', function(finished) {
udpTest(function(message, server) {
assert.equal(message, 'a:1|c');
server.close();
finished();
}, function(server) {
var address = server.address(),
statsd = new StatsD({
host: address.address,
port: address.port,
batch: true
});
statsd.increment('a', 1);
statsd.close();
});
});
});
});