diff --git a/lib/scsocket.js b/lib/scsocket.js index 1592f59..f24c356 100644 --- a/lib/scsocket.js +++ b/lib/scsocket.js @@ -802,12 +802,13 @@ SCSocket.prototype._trySubscribe = function (channel) { options.waitForAuth = true; subscriptionOptions.waitForAuth = options.waitForAuth; } - if (channel.batch) { - options.batch = true; - } if (channel.data) { subscriptionOptions.data = channel.data; } + if (channel.batch) { + options.batch = true; + subscriptionOptions.batch = true; + } channel._pendingSubscriptionCid = this.transport.emit( '#subscribe', subscriptionOptions, options, diff --git a/lib/scsocketcreator.js b/lib/scsocketcreator.js index 6b301f0..527c639 100644 --- a/lib/scsocketcreator.js +++ b/lib/scsocketcreator.js @@ -70,6 +70,7 @@ function connect(options) { authTokenName: 'socketCluster.authToken', binaryType: 'arraybuffer', multiplex: true, + pubSubBatchDuration: null, cloneData: false }; for (var i in options) { diff --git a/lib/sctransport.js b/lib/sctransport.js index 5260074..1afea62 100644 --- a/lib/sctransport.js +++ b/lib/sctransport.js @@ -211,6 +211,27 @@ SCTransport.prototype._onClose = function (code, data) { } }; +SCTransport.prototype._handleEventObject = function (obj, message) { + if (obj && obj.event != null) { + var response = new Response(this, obj.cid); + Emitter.prototype.emit.call(this, 'event', obj.event, obj.data, response); + } else if (obj && obj.rid != null) { + var eventObject = this._callbackMap[obj.rid]; + if (eventObject) { + clearTimeout(eventObject.timeout); + delete eventObject.timeout; + delete this._callbackMap[obj.rid]; + + if (eventObject.callback) { + var rehydratedError = scErrors.hydrateError(obj.error); + eventObject.callback(rehydratedError, obj.data); + } + } + } else { + Emitter.prototype.emit.call(this, 'event', 'raw', message); + } +}; + SCTransport.prototype._onMessage = function (message) { Emitter.prototype.emit.call(this, 'event', 'message', message); @@ -223,26 +244,13 @@ SCTransport.prototype._onMessage = function (message) { this.sendObject('#2'); } } else { - var event = obj.event; - - if (event) { - var response = new Response(this, obj.cid); - Emitter.prototype.emit.call(this, 'event', event, obj.data, response); - } else if (obj.rid != null) { - - var eventObject = this._callbackMap[obj.rid]; - if (eventObject) { - clearTimeout(eventObject.timeout); - delete eventObject.timeout; - delete this._callbackMap[obj.rid]; - - if (eventObject.callback) { - var rehydratedError = scErrors.hydrateError(obj.error); - eventObject.callback(rehydratedError, obj.data); - } + if (Array.isArray(obj)) { + var len = obj.length; + for (var i = 0; i < len; i++) { + this._handleEventObject(obj[i], message); } } else { - Emitter.prototype.emit.call(this, 'event', 'raw', obj); + this._handleEventObject(obj, message); } } }; @@ -297,11 +305,8 @@ SCTransport.prototype.emitObject = function (eventObject, options) { this._callbackMap[eventObject.cid] = eventObject; } - if (options && options.batch) { - this.sendObjectBatch(simpleEventObject); - } else { - this.sendObject(simpleEventObject); - } + this.sendObject(simpleEventObject, options); + return eventObject.cid || null; }; @@ -391,20 +396,16 @@ SCTransport.prototype.serializeObject = function (object) { return null; }; -SCTransport.prototype.sendObject = function (object) { - var str = this.serializeObject(object); - if (str != null) { - this.send(str); - } -}; - SCTransport.prototype.sendObjectBatch = function (object) { var self = this; - clearTimeout(this._batchTimeout); this._batchSendList.push(object); + if (this._batchTimeout) { + return; + } this._batchTimeout = setTimeout(function () { + delete self._batchTimeout; if (self._batchSendList.length) { var str = self.serializeObject(self._batchSendList); if (str != null) { @@ -412,7 +413,22 @@ SCTransport.prototype.sendObjectBatch = function (object) { } self._batchSendList = []; } - }, 0); + }, this.options.pubSubBatchDuration || 0); +}; + +SCTransport.prototype.sendObjectSingle = function (object) { + var str = this.serializeObject(object); + if (str != null) { + this.send(str); + } +}; + +SCTransport.prototype.sendObject = function (object, options) { + if (options && options.batch) { + this.sendObjectBatch(object); + } else { + this.sendObjectSingle(object); + } }; module.exports.SCTransport = SCTransport; diff --git a/package.json b/package.json index c946342..7d7aa15 100644 --- a/package.json +++ b/package.json @@ -23,7 +23,7 @@ "component-emitter": "1.2.1", "linked-list": "0.1.0", "querystring": "0.2.0", - "sc-channel": "~1.1.0", + "sc-channel": "~1.2.0", "sc-errors": "~1.3.3", "sc-formatter": "~3.0.0", "ws": "3.1.0"