diff --git a/lib/statsd.js b/lib/statsd.js index f27383b..86ff3cc 100644 --- a/lib/statsd.js +++ b/lib/statsd.js @@ -1,33 +1,49 @@ var dgram = require('dgram'), dns = require('dns'); +/** + * @const + */ +var DEFAULT_MAX_BATCH_DELAY = 1000; + +/** + * @const + */ +var DEFAULT_MAX_BATCH_LENGTH = 1500; + /** * The UDP Client for StatsD * @param options - * @option host {String} The host to connect to default: localhost - * @option port {String|Integer} The port to connect to default: 8125 - * @option prefix {String} An optional prefix to assign to each stat name sent - * @option suffix {String} An optional suffix to assign to each stat name sent - * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace - * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once - * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. - * @option global_tags {Array=} Optional tags that will be added to every metric + * @option host {String} The host to connect to default: localhost + * @option port {String|Integer} The port to connect to default: 8125 + * @option prefix {String} An optional prefix to assign to each stat name sent + * @option suffix {String} An optional suffix to assign to each stat name sent + * @option globalize {boolean} An optional boolean to add "statsd" as an object in the global namespace + * @option cacheDns {boolean} An optional option to only lookup the hostname -> ip address once + * @option mock {boolean} An optional boolean indicating this Client is a mock object, no stats are sent. + * @option global_tags {Array=} Optional tags that will be added to every metric + * @option batch {boolean} Whether to send metrics in batches + * @option maxBatchDelay {Integer} Maximum period for batch accumulation in millisecond + * @option maxBatchLength {Integer} Maximum length of a packet, should not be more then the network MTU * @constructor */ -var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags) { +var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, global_tags, batch, maxBatchDelay, maxBatchLength) { var options = host || {}, self = this; if(arguments.length > 1 || typeof(host) === 'string'){ options = { - host : host, - port : port, - prefix : prefix, - suffix : suffix, - globalize : globalize, - cacheDns : cacheDns, - mock : mock === true, - global_tags : global_tags + host : host, + port : port, + prefix : prefix, + suffix : suffix, + globalize : globalize, + cacheDns : cacheDns, + mock : mock === true, + global_tags : global_tags, + batch : batch, + maxBatchDelay : maxBatchDelay, + maxBatchLength : maxBatchLength }; } @@ -38,6 +54,9 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl this.socket = dgram.createSocket('udp4'); this.mock = options.mock; this.global_tags = options.global_tags || []; + this.batch = options.batch; + this.maxBatchDelay = options.maxBatchDelay || DEFAULT_MAX_BATCH_DELAY; + this.maxBatchLength = options.maxBatchLength || DEFAULT_MAX_BATCH_LENGTH; if(options.cacheDns === true){ dns.lookup(options.host, function(err, address, family){ @@ -50,6 +69,56 @@ var Client = function (host, port, prefix, suffix, globalize, cacheDns, mock, gl if(options.globalize){ global.statsd = this; } + + if (options.batch) { + this.pendingMessages = []; + this.batchLength = 0; + } +}; + +Client.prototype._sendBatch = function(batchCallback) { + if (this.pendingMessages.length) { + var batchMessage = this.pendingMessages.map(function(msg) { return msg.message; }).join('\n'); + var callbacks = this.pendingMessages.map(function(msg) { return msg.callback; }); + this._send(batchMessage, function(err) { + if (typeof batchCallback === 'function') { + batchCallback(err); + } + callbacks.forEach(function(callback) { + if (typeof callback === 'function') { + callback(err); + } + }); + }); + this.pendingMessages = []; + this.batchLength = 0; + } else if (typeof batchCallback === 'function') { + batchCallback(); + } +}; + +Client.prototype._addToBatch = function(message, callback) { + if (!!message) { + if (this.batchLength + message.length > this.maxBatchLength) { + this._sendBatch(); + // Reset the timer as we've just sent the batches + if (this.sendTimeout) { + clearTimeout(this.sendTimeout); + this.sendTimeout = null; + } + } else if (!this.sendTimeout) { + // If we are not sending a batch now and didn't have a send timeout yet + // we need to init it to ensure this message will be sent not more that + // after maxBatchDelay + this.sendTimeout = setTimeout(this._sendBatch.bind(this), this.maxBatchDelay); + } + + this.pendingMessages.push({ + message: message, + callback: callback + }); + this.batchLength += message.length; + } }; /** @@ -215,6 +284,14 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) message += '|#' + merged_tags.join(','); } + if (this.batch) { + this._addToBatch(message, callback); + } else { + this._send(message, callback); + } +}; + +Client.prototype._send = function(message, callback) { // Only send this stat if we're not a mock Client. if(!this.mock) { buf = new Buffer(message); @@ -229,9 +306,22 @@ Client.prototype.send = function (stat, value, type, sampleRate, tags, callback) /** * Close the underlying socket and stop listening for data on it. */ -Client.prototype.close = function(){ - this.socket.close(); -} +Client.prototype.close = function() { + var self = this; + if (self.batch) { + // Clear the send timeout + if (self.sendTimeout) { + clearTimeout(this.sendTimeout); + self.sendTimeout = null; + } + // And send pending messages immediately + self._sendBatch(function() { + self.socket.close(); + }); + } else { + self.socket.close(); + } +}; exports = module.exports = Client; exports.StatsD = Client; diff --git a/package.json b/package.json index 5a672b3..f47efd7 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name" : "node-statsd" , "description" : "node client for Etsy'd StatsD server" -, "version" : "0.1.1" +, "version" : "0.1.2" , "author" : "Steve Ivy" , "contributors": [ "Russ Bradberry " ] , "repository" : diff --git a/test/test_statsd.js b/test/test_statsd.js index 0fbb314..7063141 100644 --- a/test/test_statsd.js +++ b/test/test_statsd.js @@ -680,4 +680,75 @@ describe('StatsD', function(){ }); }); + describe('#batching', function() { + it('should send batches not longer than the max length', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c\na:1|c'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true, + maxBatchLength: 10 + }); + for (var i = 0; i < 5; i++) { + statsd.increment('a', 1); + } + }); + }); + it('should send a batch after one second', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true + }); + statsd.increment('a', 1); + }); + }); + it('should send batches of different messages', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c\nb:-1|c\nc:1|g\nd:2|h\ne:3|ms\nf:5|s'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true, + }); + statsd.increment('a', 1); + statsd.decrement('b', 1); + statsd.gauge('c', 1); + statsd.histogram('d', 2); + statsd.timing('e', 3); + statsd.set('f', 5); + }); + }); + it('should send pending messages after client is closed', function(finished) { + udpTest(function(message, server) { + assert.equal(message, 'a:1|c'); + server.close(); + finished(); + }, function(server) { + var address = server.address(), + statsd = new StatsD({ + host: address.address, + port: address.port, + batch: true + }); + statsd.increment('a', 1); + statsd.close(); + }); + }); + }); });