Skip to content

Commit

Permalink
Polished subscription batching feature
Browse files Browse the repository at this point in the history
  • Loading branch information
jondubois committed Sep 23, 2017
1 parent 94406bf commit 9ca1671
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 36 deletions.
7 changes: 4 additions & 3 deletions lib/scsocket.js
Original file line number Diff line number Diff line change
Expand Up @@ -802,12 +802,13 @@ SCSocket.prototype._trySubscribe = function (channel) {
options.waitForAuth = true;
subscriptionOptions.waitForAuth = options.waitForAuth;
}
if (channel.batch) {
options.batch = true;
}
if (channel.data) {
subscriptionOptions.data = channel.data;
}
if (channel.batch) {
options.batch = true;
subscriptionOptions.batch = true;
}

channel._pendingSubscriptionCid = this.transport.emit(
'#subscribe', subscriptionOptions, options,
Expand Down
1 change: 1 addition & 0 deletions lib/scsocketcreator.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ function connect(options) {
authTokenName: 'socketCluster.authToken',
binaryType: 'arraybuffer',
multiplex: true,
pubSubBatchDuration: null,
cloneData: false
};
for (var i in options) {
Expand Down
80 changes: 48 additions & 32 deletions lib/sctransport.js
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,27 @@ SCTransport.prototype._onClose = function (code, data) {
}
};

SCTransport.prototype._handleEventObject = function (obj, message) {
if (obj && obj.event != null) {
var response = new Response(this, obj.cid);
Emitter.prototype.emit.call(this, 'event', obj.event, obj.data, response);
} else if (obj && obj.rid != null) {
var eventObject = this._callbackMap[obj.rid];
if (eventObject) {
clearTimeout(eventObject.timeout);
delete eventObject.timeout;
delete this._callbackMap[obj.rid];

if (eventObject.callback) {
var rehydratedError = scErrors.hydrateError(obj.error);
eventObject.callback(rehydratedError, obj.data);
}
}
} else {
Emitter.prototype.emit.call(this, 'event', 'raw', message);
}
};

SCTransport.prototype._onMessage = function (message) {
Emitter.prototype.emit.call(this, 'event', 'message', message);

Expand All @@ -223,26 +244,13 @@ SCTransport.prototype._onMessage = function (message) {
this.sendObject('#2');
}
} else {
var event = obj.event;

if (event) {
var response = new Response(this, obj.cid);
Emitter.prototype.emit.call(this, 'event', event, obj.data, response);
} else if (obj.rid != null) {

var eventObject = this._callbackMap[obj.rid];
if (eventObject) {
clearTimeout(eventObject.timeout);
delete eventObject.timeout;
delete this._callbackMap[obj.rid];

if (eventObject.callback) {
var rehydratedError = scErrors.hydrateError(obj.error);
eventObject.callback(rehydratedError, obj.data);
}
if (Array.isArray(obj)) {
var len = obj.length;
for (var i = 0; i < len; i++) {
this._handleEventObject(obj[i], message);
}
} else {
Emitter.prototype.emit.call(this, 'event', 'raw', obj);
this._handleEventObject(obj, message);
}
}
};
Expand Down Expand Up @@ -297,11 +305,8 @@ SCTransport.prototype.emitObject = function (eventObject, options) {
this._callbackMap[eventObject.cid] = eventObject;
}

if (options && options.batch) {
this.sendObjectBatch(simpleEventObject);
} else {
this.sendObject(simpleEventObject);
}
this.sendObject(simpleEventObject, options);

return eventObject.cid || null;
};

Expand Down Expand Up @@ -391,28 +396,39 @@ SCTransport.prototype.serializeObject = function (object) {
return null;
};

SCTransport.prototype.sendObject = function (object) {
var str = this.serializeObject(object);
if (str != null) {
this.send(str);
}
};

SCTransport.prototype.sendObjectBatch = function (object) {
var self = this;

clearTimeout(this._batchTimeout);
this._batchSendList.push(object);
if (this._batchTimeout) {
return;
}

this._batchTimeout = setTimeout(function () {
delete self._batchTimeout;
if (self._batchSendList.length) {
var str = self.serializeObject(self._batchSendList);
if (str != null) {
self.send(str);
}
self._batchSendList = [];
}
}, 0);
}, this.options.pubSubBatchDuration || 0);
};

SCTransport.prototype.sendObjectSingle = function (object) {
var str = this.serializeObject(object);
if (str != null) {
this.send(str);
}
};

SCTransport.prototype.sendObject = function (object, options) {
if (options && options.batch) {
this.sendObjectBatch(object);
} else {
this.sendObjectSingle(object);
}
};

module.exports.SCTransport = SCTransport;
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"component-emitter": "1.2.1",
"linked-list": "0.1.0",
"querystring": "0.2.0",
"sc-channel": "~1.1.0",
"sc-channel": "~1.2.0",
"sc-errors": "~1.3.3",
"sc-formatter": "~3.0.0",
"ws": "3.1.0"
Expand Down

0 comments on commit 9ca1671

Please sign in to comment.