From 8705c4fe35dc8d7eead0ac4778fda528c697a0ec Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Wed, 18 Apr 2018 16:27:13 +0100 Subject: [PATCH 01/23] Fix Doc.prototype.destroy The problem was that unsubscribe re-added the doc to the connection. Now the doc is removed from the connection after unsubscribe. Additionally, we're no longer waiting for the unsubscribe response before executing the callback. It is consistent with Query, unsubscribe can't fail anyway and the subscribed state is updated synchronously on the client side. --- lib/client/doc.js | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 05e17976d..bf128eb51 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -104,10 +104,8 @@ emitter.mixin(Doc); Doc.prototype.destroy = function(callback) { var doc = this; doc.whenNothingPending(function() { + if (doc.wantSubscribe) doc.unsubscribe(); doc.connection._destroyDoc(doc); - if (doc.wantSubscribe) { - return doc.unsubscribe(callback); - } if (callback) callback(); }); }; From af84be65f208856e0933d627464b79ab053a6dc0 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Wed, 18 Apr 2018 16:36:58 +0100 Subject: [PATCH 02/23] Update tested nodejs versions in .travis.yml See See https://github.com/nodejs/Release --- .travis.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1b9165051..66e0be28c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,9 @@ language: node_js node_js: - - 6 - - 5 - - 4 - - 0.10 + - "9" + - "8" + - "6" + - "4" script: "npm run jshint && npm run test-cover" # Send coverage data to Coveralls after_script: "cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js" From 09edf920eedf6d5116efb2271d693a9a59da1517 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Thu, 19 Apr 2018 12:37:24 +0100 Subject: [PATCH 03/23] Add a test --- lib/client/doc.js | 4 +++- test/client/subscribe.js | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index bf128eb51..33621cb9c 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -104,7 +104,9 @@ emitter.mixin(Doc); Doc.prototype.destroy = function(callback) { var doc = this; doc.whenNothingPending(function() { - if (doc.wantSubscribe) doc.unsubscribe(); + if (doc.wantSubscribe) { + doc.unsubscribe(); + } doc.connection._destroyDoc(doc); if (callback) callback(); }); diff --git a/test/client/subscribe.js b/test/client/subscribe.js index 567031d0a..db2bea1b2 100644 --- a/test/client/subscribe.js +++ b/test/client/subscribe.js @@ -405,8 +405,10 @@ describe('client subscribe', function() { }); it('doc destroy stops op updates', function(done) { - var doc = this.backend.connect().get('dogs', 'fido'); - var doc2 = this.backend.connect().get('dogs', 'fido'); + var connection1 = this.backend.connect(); + var connection2 = this.backend.connect(); + var doc = connection1.get('dogs', 'fido'); + var doc2 = connection2.get('dogs', 'fido'); doc.create({age: 3}, function(err) { if (err) return done(err); doc2.subscribe(function(err) { @@ -416,6 +418,7 @@ describe('client subscribe', function() { }); doc2.destroy(function(err) { if (err) return done(err); + expect(connection2.getExisting('dogs', 'fido')).equal(undefined); doc.submitOp({p: ['age'], na: 1}, done); }); }); From 9121bafe48e6a43dfdf29ee7ef9088b30a9b2a08 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Wed, 18 Apr 2018 16:36:58 +0100 Subject: [PATCH 04/23] Update tested nodejs versions in .travis.yml See See https://github.com/nodejs/Release --- .travis.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.travis.yml b/.travis.yml index 1b9165051..66e0be28c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,9 @@ language: node_js node_js: - - 6 - - 5 - - 4 - - 0.10 + - "9" + - "8" + - "6" + - "4" script: "npm run jshint && npm run test-cover" # Send coverage data to Coveralls after_script: "cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js" From 4dbefd14464f234cdc4cb7ee35d3b83ae534cbb2 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Tue, 17 Oct 2017 14:15:42 +0100 Subject: [PATCH 05/23] Add .editorconfig --- .editorconfig | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 000000000..e29f5e504 --- /dev/null +++ b/.editorconfig @@ -0,0 +1,9 @@ +root = true + +[*] +indent_style = space +indent_size = 2 +end_of_line = LF +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true From 2ef8181081b168ce773fca51db248fab518dbe0f Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Mon, 23 Apr 2018 14:51:13 +0100 Subject: [PATCH 06/23] Update mocha I had to add the --exit flag workaround to mocha.opts to make it exit when tests are done. A better long-term solution would be to ensure that nothing keeps node running after all tests are done, see https://boneskull.com/mocha-v4-nears-release/#mochawontforceexit. --- package.json | 2 +- test/mocha.opts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/package.json b/package.json index 47684ed71..5f51224c8 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "expect.js": "^0.3.1", "istanbul": "^0.4.2", "jshint": "^2.9.2", - "mocha": "^3.2.0", + "mocha": "^5.1.1", "sharedb-mingo-memory": "^1.0.0-beta" }, "scripts": { diff --git a/test/mocha.opts b/test/mocha.opts index 34f904192..7ca4707b0 100644 --- a/test/mocha.opts +++ b/test/mocha.opts @@ -1,3 +1,4 @@ --reporter spec --check-leaks --recursive +--exit From 6b687db2744156665608b86f5ed9d59470a28292 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Wed, 18 Apr 2018 16:27:13 +0100 Subject: [PATCH 07/23] Fix Doc.prototype.destroy The problem was that unsubscribe re-added the doc to the connection. Now the doc is removed from the connection after unsubscribe. Additionally, we're no longer waiting for the unsubscribe response before executing the callback. It is consistent with Query, unsubscribe can't fail anyway and the subscribed state is updated synchronously on the client side. --- lib/client/doc.js | 4 ++-- test/client/subscribe.js | 7 +++++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 05e17976d..33621cb9c 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -104,10 +104,10 @@ emitter.mixin(Doc); Doc.prototype.destroy = function(callback) { var doc = this; doc.whenNothingPending(function() { - doc.connection._destroyDoc(doc); if (doc.wantSubscribe) { - return doc.unsubscribe(callback); + doc.unsubscribe(); } + doc.connection._destroyDoc(doc); if (callback) callback(); }); }; diff --git a/test/client/subscribe.js b/test/client/subscribe.js index 567031d0a..db2bea1b2 100644 --- a/test/client/subscribe.js +++ b/test/client/subscribe.js @@ -405,8 +405,10 @@ describe('client subscribe', function() { }); it('doc destroy stops op updates', function(done) { - var doc = this.backend.connect().get('dogs', 'fido'); - var doc2 = this.backend.connect().get('dogs', 'fido'); + var connection1 = this.backend.connect(); + var connection2 = this.backend.connect(); + var doc = connection1.get('dogs', 'fido'); + var doc2 = connection2.get('dogs', 'fido'); doc.create({age: 3}, function(err) { if (err) return done(err); doc2.subscribe(function(err) { @@ -416,6 +418,7 @@ describe('client subscribe', function() { }); doc2.destroy(function(err) { if (err) return done(err); + expect(connection2.getExisting('dogs', 'fido')).equal(undefined); doc.submitOp({p: ['age'], na: 1}, done); }); }); From 1489e36c1e4179b76ba505c8558e6b5bf4619034 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Tue, 24 Apr 2018 13:49:59 +0100 Subject: [PATCH 08/23] Fix hasWritePending in op's callback --- lib/client/doc.js | 5 +++-- test/client/submit.js | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 36 insertions(+), 2 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 33621cb9c..a7d1d845e 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -888,9 +888,10 @@ Doc.prototype._hardRollback = function(err) { }; Doc.prototype._clearInflightOp = function(err) { - var called = callEach(this.inflightOp.callbacks, err); - + var callbacks = this.inflightOp && this.inflightOp.callbacks; this.inflightOp = null; + var called = callbacks && callEach(callbacks, err); + this.flush(); this._emitNothingPending(); diff --git a/test/client/submit.js b/test/client/submit.js index 4e508e66e..4334b57e8 100644 --- a/test/client/submit.js +++ b/test/client/submit.js @@ -1044,6 +1044,39 @@ describe('client submit', function() { }); }); + it('hasWritePending is false when create\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + + it('hasWritePending is false when submimtOp\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.submitOp({p: ['age'], na: 2}, function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + + it('hasWritePending is false when del\'s callback is executed', function(done) { + var doc = this.backend.connect().get('dogs', 'fido'); + doc.create({age: 3}, function(err) { + if (err) return done(err); + doc.del(function(err) { + if (err) return done(err); + expect(doc.hasWritePending()).equal(false); + done(); + }); + }); + }); + describe('type.deserialize', function() { it('can create a new doc', function(done) { var doc = this.backend.connect().get('dogs', 'fido'); From a4499a539cc6961f26174126a8f5d00cc251b757 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Mon, 16 Apr 2018 13:30:49 +0100 Subject: [PATCH 09/23] Implement ephemeral "presence" data sync --- README.md | 19 + lib/agent.js | 48 ++ lib/backend.js | 5 + lib/client/connection.js | 22 + lib/client/doc.js | 384 +++++++++- test/client/presence-type.js | 82 +++ test/client/presence.js | 1277 ++++++++++++++++++++++++++++++++++ test/util.js | 6 + 8 files changed, 1832 insertions(+), 11 deletions(-) create mode 100644 test/client/presence-type.js create mode 100644 test/client/presence.js diff --git a/README.md b/README.md index 69770ed33..3cbdea6e8 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ tracker](https://github.com/share/sharedb/issues). - Realtime synchronization of any JSON document - Concurrent multi-user collaboration +- Realtime synchronization of any ephemeral "presence" data - Synchronous editing API with asynchronous eventual consistency - Realtime query subscriptions - Simple integration with any database - [MongoDB](https://github.com/share/sharedb-mongo), [PostgresQL](https://github.com/share/sharedb-postgres) (experimental) @@ -57,6 +58,10 @@ initial data. Then you can submit editing operations on the document (using OT). Finally you can delete the document with a delete operation. By default, ShareDB stores all operations forever - nothing is truly deleted. +## User presence synchronization + +Presence data represents a user and is automatically synchronized between all clients subscribed to the same document. Its format is defined by the document's [OT Type](https://github.com/ottypes/docs), for example it may contain a user ID and a cursor position in a text document. All clients can modify their own presence data and receive a read-only version of other client's data. Presence data is automatically cleared when a client unsubscribes from the document or disconnects. It is also automatically transformed against applied operations, so that it still makes sense in the context of a modified document, for example a cursor position may be automatically advanced when a user types at the beginning of a text document. + ## Server API ### Initialization @@ -221,6 +226,9 @@ Unique document ID `doc.data` _(Object)_ Document contents. Available after document is fetched or subscribed to. +`doc.presence` _(Object)_ +Each property under `doc.presence` contains presence data shared by a client subscribed to this document. The property name is an empty string for this client's data and connection IDs for other clients' data. + `doc.fetch(function(err) {...})` Populate the fields on `doc` with a snapshot of the document from the server. @@ -250,6 +258,9 @@ An operation was applied to the data. `source` will be `false` for ops received `doc.on('del', function(data, source) {...})` The document was deleted. Document contents before deletion are passed in as an argument. `source` will be `false` for ops received from the server and defaults to `true` for ops generated locally. +`doc.on('presence', function(srcList) {...})` +Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. + `doc.on('error', function(err) {...})` There was an error fetching the document or applying an operation. @@ -283,6 +294,11 @@ Invokes the given callback function after Note that `whenNothingPending` does NOT wait for pending `model.query()` calls. +`doc.submitPresence(presenceData[, function(err) {...}])` +Set local presence data and publish it for other clients. +`presenceData` structure depends on the document type. +Presence is synchronized only when subscribed to the document. + ### Class: `ShareDB.Query` `query.ready` _(Boolean)_ @@ -358,6 +374,9 @@ Additional fields may be added to the error object for debugging context dependi * 4021 - Invalid client id * 4022 - Database adapter does not support queries * 4023 - Cannot project snapshots of this type +* 4024 - OT Type does not support presence +* 4025 - Not subscribed to document +* 4026 - Presence data superseded ### 5000 - Internal error diff --git a/lib/agent.js b/lib/agent.js index d1a944de4..f04baa2bd 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -1,6 +1,7 @@ var hat = require('hat'); var util = require('./util'); var types = require('./types'); +var ShareDBError = require('./error'); /** * Agent deserializes the wire protocol messages received from the stream and @@ -25,6 +26,9 @@ function Agent(backend, stream) { // Map from queryId -> emitter this.subscribedQueries = {}; + // The max presence sequence number received from the client. + this.maxPresenceSeq = 0; + // We need to track this manually to make sure we don't reply to messages // after the stream was closed. this.closed = false; @@ -98,10 +102,17 @@ Agent.prototype._subscribeToStream = function(collection, id, stream) { console.error('Doc subscription stream error', collection, id, data.error); return; } + if (data.a === 'p') { + // Send other clients' presence data + if (data.src !== agent.clientId) agent.send(data); + return; + } if (agent._isOwnOp(collection, data)) return; agent._sendOp(collection, id, data); }); stream.on('end', function() { + var presence = agent._createPresence(collection, id); + agent.backend.sendPresence(presence); // The op stream is done sending, so release its reference var streams = agent.subscribedDocs[collection]; if (!streams) return; @@ -268,6 +279,13 @@ Agent.prototype._checkRequest = function(request) { // Bulk request if (request.c != null && typeof request.c !== 'string') return 'Invalid collection'; if (typeof request.b !== 'object') return 'Invalid bulk subscribe data'; + } else if (request.a === 'p') { + // Presence + if (typeof request.c !== 'string') return 'Invalid collection'; + if (typeof request.d !== 'string') return 'Invalid id'; + if (typeof request.v !== 'number' || request.v < 0) return 'Invalid version'; + if (typeof request.seq !== 'number' || request.seq <= 0) return 'Invalid seq'; + if (typeof request.r !== 'undefined' && typeof request.r !== 'boolean') return 'Invalid "request reply" value'; } }; @@ -300,6 +318,9 @@ Agent.prototype._handleMessage = function(request, callback) { var op = this._createOp(request); if (!op) return callback({code: 4000, message: 'Invalid op message'}); return this._submit(request.c, request.d, op, callback); + case 'p': + var presence = this._createPresence(request.c, request.d, request.p, request.v, request.r, request.seq); + return this._presence(presence, callback); default: callback({code: 4000, message: 'Invalid or unknown message'}); } @@ -582,3 +603,30 @@ Agent.prototype._createOp = function(request) { return new DeleteOp(src, request.seq, request.v, request.del); } }; + +Agent.prototype._presence = function(presence, callback) { + if (presence.seq <= this.maxPresenceSeq) { + return callback(new ShareDBError(4026, 'Presence data superseded')); + } + this.maxPresenceSeq = presence.seq; + if (!this.subscribedDocs[presence.c] || !this.subscribedDocs[presence.c][presence.d]) { + return callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d)); + } + this.backend.sendPresence(presence, function(err) { + if (err) return callback(err); + callback(null, { seq: presence.seq }); + }); +}; + +Agent.prototype._createPresence = function(collection, id, data, version, requestReply, seq) { + return { + a: 'p', + src: this.clientId, + seq: seq != null ? seq : this.maxPresenceSeq, + c: collection, + d: id, + p: data, + v: version, + r: requestReply + }; +}; diff --git a/lib/backend.js b/lib/backend.js index 22791f30b..40a1ca282 100644 --- a/lib/backend.js +++ b/lib/backend.js @@ -515,6 +515,11 @@ Backend.prototype.getChannels = function(collection, id) { ]; }; +Backend.prototype.sendPresence = function(presence, callback) { + var channels = [ this.getDocChannel(presence.c, presence.d) ]; + this.pubsub.publish(channels, presence, callback); +}; + function pluckIds(snapshots) { var ids = []; for (var i = 0; i < snapshots.length; i++) { diff --git a/lib/client/connection.js b/lib/client/connection.js index f4cc298e6..b8d7f1ccc 100644 --- a/lib/client/connection.js +++ b/lib/client/connection.js @@ -243,6 +243,11 @@ Connection.prototype.handleMessage = function(message) { if (doc) doc._handleOp(err, message); return; + case 'p': + var doc = this.getExisting(message.c, message.d); + if (doc) doc._handlePresence(err, message); + return; + default: console.warn('Ignoring unrecognized message', message); } @@ -408,6 +413,23 @@ Connection.prototype.sendOp = function(doc, op) { this.send(message); }; +Connection.prototype.sendPresence = function(doc, data, requestReply) { + // Ensure the doc is registered so that it receives the reply message + this._addDoc(doc); + var message = { + a: 'p', + c: doc.collection, + d: doc.id, + p: data, + v: doc.version || 0, + seq: this.seq++ + }; + if (requestReply) { + message.r = true; + } + this.send(message); +}; + /** * Sends a message down the socket diff --git a/lib/client/doc.js b/lib/client/doc.js index a7d1d845e..e92c4b644 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -28,6 +28,14 @@ var types = require('../types'); * }) * * + * Presence + * -------- + * + * We can associate transient "presence" data with a document, eg caret position, etc. + * The presence data is synchronized on the best-effort basis between clients subscribed to the same document. + * Each client has their own presence data which is read-write. Other clients' data is read-only. + * + * * Events * ------ * @@ -42,6 +50,7 @@ var types = require('../types'); * the data is null. It is passed the data before delteion as an * arguments * - `load ()` Fired when a new snapshot is ingested from a fetch, subscribe, or query + * - `presence ([src])` Fired after the presence data has changed. */ module.exports = Doc; @@ -57,11 +66,37 @@ function Doc(connection, collection, id) { this.type = null; this.data = undefined; + // The current presence data + // Map of src -> presence data + // Local src === '' + this.presence = Object.create(null); + // The presence objects received from the server + // Map of src -> presence + this.receivedPresence = Object.create(null); + // The minimum amount of time to wait before removing processed presence from this.receivedPresence. + // The processed presence is removed to avoid leaking memory, in case peers keep connecting and disconnecting a lot. + // The processed presence is not removed immediately to enable avoiding race conditions, where messages with lower + // sequence number arrive after messages with higher sequence numbers. + this.receivedPresenceTimeout = 60000; + // If set to true, then the next time the local presence is sent, + // all other clients will be asked to reply with their own presence data. + this.requestReplyPresence = true; + // A list of ops sent by the server. These are needed for transforming presence data, + // if we get that presence data for an older version of the document. + // The ops are cached for 1 minute by default, which should be lots, considering that the presence + // data is supposed to be synced in real-time. + this.cachedOps = []; + this.cachedOpsTimeout = 60000; + // The sequence number of the inflight presence request. + this.inflightPresenceSeq = 0; + // Array of callbacks or nulls as placeholders this.inflightFetch = []; this.inflightSubscribe = []; this.inflightUnsubscribe = []; + this.inflightPresence = null; this.pendingFetch = []; + this.pendingPresence = null; // Whether we think we are subscribed on the server. Synchronously set to // false on calls to unsubscribe and disconnect. Should never be true when @@ -108,6 +143,24 @@ Doc.prototype.destroy = function(callback) { doc.unsubscribe(); } doc.connection._destroyDoc(doc); + + // Make sure all presence callbacks are called + var callbacks = []; + if (doc.inflightPresence) { + // This shouldn't be possible but check just in case. + callbacks.push.apply(callbacks, doc.inflightPresence); + doc.inflightPresence = null; + doc.inflightPresenceSeq = 0; + } + if (doc.pendingPresence) { + callbacks.push.apply(callbacks, doc.pendingPresence); + doc.pendingPresence = null; + } + + doc.receivedPresence = Object.create(null); + doc.cachedOps.length = 0; + + callEach(callbacks); if (callback) callback(); }); }; @@ -186,12 +239,14 @@ Doc.prototype.ingestSnapshot = function(snapshot, callback) { if (this.version > snapshot.v) return callback && callback(); this.version = snapshot.v; + this.cachedOps.length = 0; var type = (snapshot.type === undefined) ? types.defaultType : snapshot.type; this._setType(type); this.data = (this.type && this.type.deserialize) ? this.type.deserialize(snapshot.data) : snapshot.data; this.emit('load'); + this._processAllReceivedPresence(); callback && callback(); }; @@ -257,6 +312,7 @@ Doc.prototype._handleSubscribe = function(err, snapshot) { if (this.wantSubscribe) this.subscribed = true; this.ingestSnapshot(snapshot, callback); this._emitNothingPending(); + this.flush(); }; Doc.prototype._handleUnsubscribe = function(err) { @@ -307,6 +363,13 @@ Doc.prototype._handleOp = function(err, message) { return; } + var serverOp = { + src: message.src, + create: !!message.create, + op: message.op, + del: !!message.del + }; + if (this.inflightOp) { var transformErr = transformX(this.inflightOp, message); if (transformErr) return this._hardRollback(transformErr); @@ -318,7 +381,9 @@ Doc.prototype._handleOp = function(err, message) { } this.version++; + this._cacheOp(serverOp); this._otApply(message, false); + this._processAllReceivedPresence(); return; }; @@ -342,7 +407,10 @@ Doc.prototype._onConnectionStateChanged = function() { if (this.inflightUnsubscribe.length) { var callbacks = this.inflightUnsubscribe; this.inflightUnsubscribe = []; + this._pausePresence(); callEach(callbacks); + } else { + this._pausePresence(); } } }; @@ -402,8 +470,10 @@ Doc.prototype.unsubscribe = function(callback) { if (this.connection.canSend) { var isDuplicate = this.connection.sendUnsubscribe(this); pushActionCallback(this.inflightUnsubscribe, isDuplicate, callback); + this._pausePresence(); return; } + this._pausePresence(); if (callback) process.nextTick(callback); }; @@ -426,14 +496,22 @@ function pushActionCallback(inflight, isDuplicate, callback) { // // Only one operation can be in-flight at a time. If an operation is already on // its way, or we're not currently connected, this method does nothing. +// +// If there are no pending ops, this method sends the pending presence data, if possible. Doc.prototype.flush = function() { - // Ignore if we can't send or we are already sending an op - if (!this.connection.canSend || this.inflightOp) return; + if (this.paused) return; - // Send first pending op unless paused - if (!this.paused && this.pendingOps.length) { + if (this.connection.canSend && !this.inflightOp && this.pendingOps.length) { this._sendOp(); } + + if (this.subscribed && !this.inflightPresence && this.pendingPresence && !this.hasWritePending()) { + this.inflightPresence = this.pendingPresence; + this.inflightPresenceSeq = this.connection.seq; + this.pendingPresence = null; + this.connection.sendPresence(this, this.presence[''], this.requestReplyPresence); + this.requestReplyPresence = false; + } }; // Helper function to set op to contain a no-op. @@ -538,6 +616,7 @@ Doc.prototype._otApply = function(op, source) { // Apply the individual op component this.emit('before op', componentOp.op, source); this.data = this.type.apply(this.data, componentOp.op); + this._transformAllPresence(op); this.emit('op', componentOp.op, source); } // Pop whatever was submitted since we started applying this op @@ -550,6 +629,7 @@ Doc.prototype._otApply = function(op, source) { this.emit('before op', op.op, source); // Apply the operation to the local data, mutating it in place this.data = this.type.apply(this.data, op.op); + this._transformAllPresence(op); // Emit an 'op' event once the local data includes the changes from the // op. For locally submitted ops, this will be synchronously with // submission and before the server or other clients have received the op. @@ -566,6 +646,7 @@ Doc.prototype._otApply = function(op, source) { this.type.createDeserialized(op.create.data) : this.type.deserialize(this.type.create(op.create.data)) : this.type.create(op.create.data); + this._transformAllPresence(op); this.emit('create', source); return; } @@ -573,6 +654,7 @@ Doc.prototype._otApply = function(op, source) { if (op.del) { var oldData = this.data; this._setType(null); + this._transformAllPresence(op); this.emit('del', oldData, source); return; } @@ -820,6 +902,7 @@ Doc.prototype.resume = function() { Doc.prototype._opAcknowledged = function(message) { if (this.inflightOp.create) { this.version = message.v; + this.cachedOps.length = 0; } else if (message.v !== this.version) { // We should already be at the same version, because the server should @@ -832,8 +915,15 @@ Doc.prototype._opAcknowledged = function(message) { // The op was committed successfully. Increment the version number this.version++; + this._cacheOp({ + src: this.inflightOp.src, + create: !!this.inflightOp.create, + op: this.inflightOp.op, + del: !!this.inflightOp.del + }); this._clearInflightOp(); + this._processAllReceivedPresence(); }; Doc.prototype._rollback = function(err) { @@ -868,21 +958,45 @@ Doc.prototype._rollback = function(err) { }; Doc.prototype._hardRollback = function(err) { - // Cancel all pending ops and reset if we can't invert - var op = this.inflightOp; - var pending = this.pendingOps; + var callbacks = []; + if (this.inflightPresence) { + callbacks.push.apply(callbacks, this.inflightPresence); + this.inflightPresence = null; + this.inflightPresenceSeq = 0; + } + if (this.pendingPresence) { + callbacks.push.apply(callbacks, this.pendingPresence); + this.pendingPresence = null; + } + if (this.inflightOp) { + callbacks.push.apply(callbacks, this.inflightOp.callbacks); + } + for (var i = 0; i < this.pendingOps.length; i++) { + callbacks.push.apply(callbacks, this.pendingOps[i].callbacks); + } + this._setType(null); this.version = null; this.inflightOp = null; this.pendingOps = []; + this.cachedOps.length = 0; + this.receivedPresence = Object.create(null); + this.requestReplyPresence = true; + + var srcList = Object.keys(this.presence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList); // Fetch the latest from the server to get us back into a working state var doc = this; this.fetch(function() { - var called = op && callEach(op.callbacks, err); - for (var i = 0; i < pending.length; i++) { - callEach(pending[i].callbacks, err); - } + var called = callEach(callbacks, err); if (err && !called) return doc.emit('error', err); }); }; @@ -909,3 +1023,251 @@ function callEach(callbacks, err) { } return called; } + +// *** Presence + +Doc.prototype.submitPresence = function (data, callback) { + if (data != null) { + if (!this.type) { + var err = new ShareDBError(4015, 'Cannot submit presence. Document has not been created. ' + this.collection + '.' + this.id); + if (callback) return callback(err); + return this.emit('error', err); + } + + if (!this.type.createPresence || !this.type.transformPresence) { + var err = new ShareDBError(4024, 'Cannot submit presence. Document\'s type does not support presence. ' + this.collection + '.' + this.id); + if (callback) return callback(err); + return this.emit('error', err); + } + + data = this.type.createPresence(data); + } + + if (!this.pendingPresence) this.pendingPresence = []; + if (callback) this.pendingPresence.push(callback); + this._setPresence('', data, true); + + var doc = this; + process.nextTick(function() { + doc.flush(); + }); +}; + +Doc.prototype._handlePresence = function(err, presence) { + if (!this.subscribed) return; + + var src = presence.src; + if (!src) { + // Handle the ACK for the presence data we submitted. + // this.inflightPresenceSeq would not equal presence.seq after a hard rollback, + // when all callbacks are flushed with an error. + if (this.inflightPresenceSeq === presence.seq) { + var callbacks = this.inflightPresence; + this.inflightPresence = null; + this.inflightPresenceSeq = 0; + var called = callbacks && callEach(callbacks, err); + if (err && !called) this.emit('error', err); + this.flush(); + } + return; + } + + // This shouldn't happen but check just in case. + if (err) return this.emit('error', err); + + if (presence.r && !this.pendingPresence) { + // Another client requested us to share our current presence data + this.pendingPresence = []; + this.flush(); + } + + // Ignore older messages which arrived out of order + if ( + this.receivedPresence[src] && ( + this.receivedPresence[src].seq > presence.seq || + (this.receivedPresence[src].seq === presence.seq && presence.v != null) + ) + ) return; + + this.receivedPresence[src] = presence; + + if (presence.v == null) { + // null version should happen only when the server automatically sends + // null presence for an unsubscribed client + presence.processedAt = Date.now(); + return this._setPresence(src, null, true); + } + + // Get missing ops first, if necessary + if (this.version == null || this.version < presence.v) return this.fetch(); + + this._processReceivedPresence(src, true); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed for src. Otherwise false. +Doc.prototype._processReceivedPresence = function(src, emit) { + if (!src) return false; + var presence = this.receivedPresence[src]; + if (!presence) return false; + + if (presence.processedAt != null) { + if (Date.now() >= presence.processedAt + this.receivedPresenceTimeout) { + // Remove old received and processed presence + delete this.receivedPresence[src]; + } + return false; + } + + if (this.version == null || this.version < presence.v) return false; // keep waiting for the missing snapshot or ops + + if (presence.p == null) { + // Remove presence data as requested + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (!this.type || !this.type.createPresence || !this.type.transformPresence) { + // Remove presence data because the document is not created or its type does not support presence + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + if (this.inflightOp && this.inflightOp.op == null) { + // Remove presence data because receivedPresence can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = 0; i < this.pendingOps.length; i++) { + if (this.pendingOps[i].op == null) { + // Remove presence data because receivedPresence can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + var startIndex = this.cachedOps.length - (this.version - presence.v); + if (startIndex < 0) { + // Remove presence data because we can't transform receivedPresence + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + + for (var i = startIndex; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].op == null) { + // Remove presence data because receivedPresence can be transformed only against "op", not "create" nor "del" + presence.processedAt = Date.now(); + return this._setPresence(src, null, emit); + } + } + + // Make sure the format of the data is correct + var data = this.type.createPresence(presence.p); + + // Transform against past ops + for (var i = startIndex; i < this.cachedOps.length; i++) { + var op = this.cachedOps[i]; + data = this.type.transformPresence(data, op.op, presence.src === op.src); + } + + // Transform against pending ops + if (this.inflightOp) { + data = this.type.transformPresence(data, this.inflightOp.op, false); + } + + for (var i = 0; i < this.pendingOps.length; i++) { + data = this.type.transformPresence(data, this.pendingOps[i].op, false); + } + + // Set presence data + presence.processedAt = Date.now(); + return this._setPresence(src, data, emit); +}; + +Doc.prototype._processAllReceivedPresence = function() { + var srcList = Object.keys(this.receivedPresence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._processReceivedPresence(src)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList); +}; + +Doc.prototype._transformPresence = function(src, op) { + var presenceData = this.presence[src]; + if (op.op != null) { + var isOwnOperation = src === (op.src || ''); + presenceData = this.type.transformPresence(presenceData, op.op, isOwnOperation); + } else { + presenceData = null; + } + return this._setPresence(src, presenceData); +}; + +Doc.prototype._transformAllPresence = function(op) { + var srcList = Object.keys(this.presence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (this._transformPresence(src, op)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList); +}; + +Doc.prototype._pausePresence = function() { + if (this.inflightPresence) { + this.pendingPresence = + this.pendingPresence ? + this.inflightPresence.concat(this.pendingPresence) : + this.inflightPresence; + this.inflightPresence = null; + this.inflightPresenceSeq = 0; + } + this.receivedPresence = Object.create(null); + this.requestReplyPresence = true; + var srcList = Object.keys(this.presence); + var changedSrcList = []; + for (var i = 0; i < srcList.length; i++) { + var src = srcList[i]; + if (src && this._setPresence(src, null)) { + changedSrcList.push(src); + } + } + this._emitPresence(changedSrcList); +}; + +// If emit is true and presence has changed, emits a presence event. +// Returns true, if presence has changed. Otherwise false. +Doc.prototype._setPresence = function(src, data, emit) { + if (data == null) { + if (this.presence[src] == null) return false; + delete this.presence[src]; + } else { + var isPresenceEqual = + this.presence[src] === data || + (this.type.comparePresence && this.type.comparePresence(this.presence[src], data)); + if (isPresenceEqual) return false; + this.presence[src] = data; + } + if (emit) this._emitPresence([ src ]); + return true; +}; + +Doc.prototype._emitPresence = function(srcList) { + if (srcList && srcList.length > 0) { + this.emit('presence', srcList); + } +}; + +Doc.prototype._cacheOp = function(op) { + this.cachedOps.push(op); + setTimeout(function() { + if (this.cachedOps[0] === op) this.cachedOps.shift(); + }.bind(this), this.cachedOpsTimeout); +}; diff --git a/test/client/presence-type.js b/test/client/presence-type.js new file mode 100644 index 000000000..51ad272a0 --- /dev/null +++ b/test/client/presence-type.js @@ -0,0 +1,82 @@ +// A simple type for testing presence, where: +// +// - snapshot is a list +// - operation is { index, value } -> insert value at index in snapshot +// - presence is { index } -> an index in the snapshot +exports.type = { + name: 'wrapped-presence-no-compare', + uri: 'http://sharejs.org/types/wrapped-presence-no-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence +}; + +// The same as `exports.type` but implements `comparePresence`. +exports.type2 = { + name: 'wrapped-presence-with-compare', + uri: 'http://sharejs.org/types/wrapped-presence-with-compare', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence, + transformPresence: transformPresence, + comparePresence: comparePresence +}; + +// The same as `exports.type` but `presence.index` is unwrapped. +exports.type3 = { + name: 'unwrapped-presence', + uri: 'http://sharejs.org/types/unwrapped-presence', + create: create, + apply: apply, + transform: transform, + createPresence: createPresence2, + transformPresence: transformPresence2 +}; + +function create(data) { + return data || []; +} + +function apply(snapshot, op) { + snapshot.splice(op.index, 0, op.value); + return snapshot; +} + +function transform(op1, op2, side) { + return op1.index < op2.index || (op1.index === op2.index && side === 'left') ? + op1 : + { + index: op1.index + 1, + value: op1.value + }; +} + +function createPresence(data) { + return { index: (data && data.index) | 0 }; +} + +function transformPresence(presence, op, isOwnOperation) { + return presence.index < op.index || (presence.index === op.index && !isOwnOperation) ? + presence : + { + index: presence.index + 1 + }; +} + +function comparePresence(presence1, presence2) { + return presence1 === presence2 || + (presence1 == null && presence2 == null) || + (presence1 != null && presence2 != null && presence1.index === presence2.index); +} + +function createPresence2(data) { + return data | 0; +} + +function transformPresence2(presence, op, isOwnOperation) { + return presence < op.index || (presence === op.index && !isOwnOperation) ? + presence : presence + 1; +} diff --git a/test/client/presence.js b/test/client/presence.js new file mode 100644 index 000000000..271b9b063 --- /dev/null +++ b/test/client/presence.js @@ -0,0 +1,1277 @@ +var async = require('async'); +var util = require('../util'); +var errorHandler = util.errorHandler; +var Backend = require('../../lib/backend'); +var ShareDBError = require('../../lib/error'); +var expect = require('expect.js'); +var types = require('../../lib/types'); +var presenceType = require('./presence-type'); +types.register(presenceType.type); +types.register(presenceType.type2); +types.register(presenceType.type3); + +[ + 'wrapped-presence-no-compare', + 'wrapped-presence-with-compare', + 'unwrapped-presence' +].forEach(function(typeName) { + function p(index) { + return typeName === 'unwrapped-presence' ? index : { index: index }; + } + + describe('client presence (' + typeName + ')', function() { + beforeEach(function() { + this.backend = new Backend(); + this.connection = this.backend.connect(); + this.connection2 = this.backend.connect(); + this.doc = this.connection.get('dogs', 'fido'); + this.doc2 = this.connection2.get('dogs', 'fido'); + }); + + afterEach(function(done) { + this.backend.close(done); + }); + + it('sends presence immediately', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('sends presence after pending ops', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.once('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('waits for pending ops before processing future presence', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for a future version. + this.doc.version += 2; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), function(err) { + if (err) return done(err); + this.doc.version -= 2; + this.doc.submitOp({ index: 0, value: 'a' }, errorHandler(done)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'c' }), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 2, value: 'c' }), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'c' }), + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (non-own ops, presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'b' }), + this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'a' }), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'c' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (transform against non-op)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [], typeName), + this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), + this.doc.del.bind(this.doc), + this.doc.create.bind(this.doc, [ 'b' ], typeName), + function(done) { + this.doc2.once('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'b' ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 2; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('handles presence sent for earlier revisions (no cached ops)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), + this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), + function(done) { + this.doc2.once('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this), + function(done) { + this.doc2.cachedOps = []; + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + // A hack to send presence for an older version. + this.doc.version = 1; + this.doc.data = [ 'a' ]; + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + async.nextTick, // wait for the doc2 presence message to reach doc + function(done) { + this.doc.on('presence', function(srcList) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + async.nextTick, // wait for the doc2 presence message to reach doc + function(done) { + this.doc.on('presence', function(srcList) { + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + done(); + }.bind(this)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(2)), + async.nextTick, // wait for the doc2 presence message to reach doc + function(done) { + this.doc.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index != op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(2)), + async.nextTick, // wait for the doc2 presence message to reach doc + function(done) { + this.doc.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, // wait for the doc2 presence message to reach doc + function(done) { + this.doc.on('presence', function(srcList) { + expect(srcList).to.eql([ '' ]); + expect(this.doc.presence['']).to.eql(p(2)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms presence against non-local op (presence.index == op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, // wait for the doc2 presence message to reach doc + function(done) { + this.doc.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection2.id ]); + expect(this.doc.presence['']).to.eql(p(1)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('caches local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + function(done) { + expect(this.doc.cachedOps.length).to.equal(3); + expect(this.doc.cachedOps[0].create).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op); + expect(this.doc.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('caches non-local ops', function(allDone) { + var op = { index: 1, value: 'b' }; + async.series([ + this.doc2.subscribe.bind(this.doc2), + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + async.nextTick, + function(done) { + expect(this.doc2.cachedOps.length).to.equal(3); + expect(this.doc2.cachedOps[0].create).to.equal(true); + expect(this.doc2.cachedOps[1].op).to.eql(op); + expect(this.doc2.cachedOps[2].del).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('removes cached ops', function(allDone) { + var op = { index: 1, value: 'b' }; + this.doc.cachedOpsTimeout = 0; + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + function(done) { + expect(this.doc.cachedOps.length).to.equal(3); + expect(this.doc.cachedOps[0].create).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op); + expect(this.doc.cachedOps[2].del).to.equal(true); + done(); + }.bind(this), + setTimeout, + function(done) { + expect(this.doc.cachedOps.length).to.equal(0); + done(); + }.bind(this) + ], allDone); + }); + + it('removes correct cached ops', function(allDone) { + var op = { index: 1, value: 'b' }; + this.doc.cachedOpsTimeout = 0; + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.submitOp.bind(this.doc, op), + this.doc.del.bind(this.doc), + function(done) { + expect(this.doc.cachedOps.length).to.equal(3); + expect(this.doc.cachedOps[0].create).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op); + expect(this.doc.cachedOps[2].del).to.equal(true); + this.doc.cachedOps.shift(); + this.doc.cachedOps.push({ op: true }); + done(); + }.bind(this), + setTimeout, + function(done) { + expect(this.doc.cachedOps.length).to.equal(1); + expect(this.doc.cachedOps[0].op).to.equal(true); + done(); + }.bind(this) + ], allDone); + }); + + it('requests reply presence when sending presence for the first time', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + if (srcList[0] === '') { + expect(srcList).to.eql([ '' ]); + expect(this.doc2.presence['']).to.eql(p(1)); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + } else { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence['']).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.requestReplyPresence).to.equal(false); + done(); + } + }.bind(this)); + this.doc2.submitPresence(p(1), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: callback(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence for uncreated document: emit(err)', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4015); + done(); + }); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: callback(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4024); + done(); + }); + }.bind(this) + ], allDone); + }); + + it('fails to submit presence, if type does not support presence: emit(err)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, {}), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4024); + done(); + }); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('submits null presence', function(allDone) { + async.series([ + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, null) + ], allDone); + }); + + it('sends presence once, if submitted multiple times synchronously', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(2)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc.submitPresence(p(2), errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('buffers presence until subscribed', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + setTimeout(function() { + this.doc.subscribe(function(err) { + if (err) return done(err); + expect(this.doc2.presence).to.eql({}); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('buffers presence when disconnected', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + this.doc.submitPresence(p(1), errorHandler(done)); + process.nextTick(function() { + this.backend.connect(this.connection); + this.doc.requestReplyPresence = false; + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('submits presence without a callback', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('cancels pending presence on destroy', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + function(done) { + this.doc.submitPresence(p(0), done); + console.log(!!this.doc.inflightPresence, !!this.doc.pendingPresence); + this.doc.destroy(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('cancels inflight presence on destroy', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.doc.submitPresence(p(0), done); + process.nextTick(function() { + this.doc.destroy(errorHandler(done)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('receives presence after doc is deleted', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + async.nextTick, + function(done) { + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ connectionId ]); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection.close(); + + // this.doc.requestReplyPresence = false; + // this.doc.submitPresence(p(0), errorHandler(done)); + // this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own disconnection', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ connectionId ]); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.connection2.close(); + + // this.doc.requestReplyPresence = false; + // this.doc.submitPresence(p(0), errorHandler(done)); + // this.doc2.del(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on peer unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ connectionId ]); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('clears peer presence on own unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + expect(this.doc2.presence['']).to.eql(p(1)); + + var connectionId = this.connection.id; + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ connectionId ]); + expect(this.doc2.presence).to.not.have.key(connectionId); + expect(this.doc2.presence['']).to.eql(p(1)); + done(); + }.bind(this)); + this.doc2.unsubscribe(errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on disconnect', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.submitPresence(p(0), callback); + process.nextTick(function() { + this.doc.submitPresence(p(1), callback); + this.connection.close(); + process.nextTick(function() { + this.backend.connect(this.connection); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('pauses inflight and pending presence on unsubscribe', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + var called = 0; + function callback(err) { + if (err) return done(err); + if (++called === 2) done(); + } + this.doc.submitPresence(p(0), callback); + process.nextTick(function() { + this.doc.submitPresence(p(1), callback); + this.doc.unsubscribe(errorHandler(done)); + process.nextTick(function() { + this.doc.subscribe(errorHandler(done)); + }.bind(this)); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index < op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(0), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 2, value: 'c' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index === op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 1, value: 'c' }, errorHandler(done)) + this.doc2.submitOp({ index: 1, value: 'b' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight and pending ops (presence.index > op.index)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(1), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)) + this.doc2.submitOp({ index: 0, value: 'a' }, errorHandler(done)) + }.bind(this) + ], allDone); + }); + + it('transforms received presence against inflight delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + async.nextTick, + function(done) { + this.doc2.on('presence', function(srcList) { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(2), errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('transforms received presence against a pending delete', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + async.nextTick, + function(done) { + var firstCall = true; + this.doc2.on('presence', function(srcList) { + if (firstCall) return firstCall = false; + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence).to.not.have.key(this.connection.id); + done(); + }.bind(this)); + this.doc.requestReplyPresence = false; + this.doc.submitPresence(p(2), errorHandler(done)); + this.doc2.submitOp({ index: 0, value: 'b' }, errorHandler(done)); + this.doc2.del(errorHandler(done)); + this.doc2.create([ 'c' ], typeName, errorHandler(done)); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(1)), + function(done) { + this.doc.on('presence', function(srcList) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ '' ]); + expect(this.doc.presence['']).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.submitPresence(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('emits the same presence only if comparePresence is not implemented (non-local presence)', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(1)), + async.nextTick, + function(done) { + this.doc2.on('presence', function(srcList) { + if (typeName === 'wrapped-presence-no-compare') { + expect(srcList).to.eql([ this.connection.id ]); + expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); + done(); + } else { + done(new Error('Unexpected presence event')); + } + }.bind(this)); + this.doc.submitPresence(p(1), typeName === 'wrapped-presence-no-compare' ? errorHandler(done) : done); + }.bind(this) + ], allDone); + }); + + it('returns an error when not subscribed on the server', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', done); + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when not subscribed on the server and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + this.connection.sendUnsubscribe(this.doc); + process.nextTick(done); + }.bind(this), + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4025); + done(); + }.bind(this)); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('returns an error when the server gets an old sequence number', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + async.nextTick, + function(done) { + this.doc.on('error', done); + this.connection.seq--; + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when the server gets an old sequence number and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + async.nextTick, + function(done) { + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + }.bind(this)); + this.connection.seq--; + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('returns an error when publishing presence fails', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + async.nextTick, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', done); + this.doc.submitPresence(p(0), function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('emits an error when publishing presence fails and no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + async.nextTick, + function(done) { + var sendPresence = this.backend.sendPresence; + this.backend.sendPresence = function(presence, callback) { + if (presence.a === 'p' && presence.v != null) { + return callback(new ShareDBError(-1, 'Test publishing error')); + } + sendPresence.apply(this, arguments); + }; + this.doc.on('error', function(err) { + expect(err).to.be.an(Error); + expect(err.code).to.equal(-1); + done(); + }.bind(this)); + this.doc.submitPresence(p(0)); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and emits an error', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + async.nextTick, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + + this.doc.submitPresence(p(1)); // inflightPresence + process.nextTick(function() { + this.doc.submitPresence(p(2)); // pendingPresence + + var presenceEmitted = false; + this.doc.on('presence', function(srcList) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + }.bind(this)); + + this.doc.on('error', function(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + done(); + }.bind(this)); + + // send an invalid op + this.doc._submit({ index: 3, value: 'b' }, true); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('clears presence on hard rollback and executes all callbacks', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a', 'b', 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(0)), + async.nextTick, + function(done) { + // A hack to allow testing of hard rollback of both inflight and pending presence. + var doc = this.doc; + var _handlePresence = this.doc._handlePresence; + this.doc._handlePresence = function(err, presence) { + setTimeout(function() { + _handlePresence.call(doc, err, presence); + }); + }; + + var presenceEmitted = false; + var called = 0; + function callback(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + if (++called < 3) return; + done(); + } + this.doc.submitPresence(p(1), callback); // inflightPresence + process.nextTick(function() { + this.doc.submitPresence(p(2), callback); // pendingPresence + + this.doc.on('presence', function(srcList) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + }.bind(this)); + this.doc.on('error', done); + + // send an invalid op + this.doc._submit({ index: 3, value: 'b' }, true, callback); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + function testReceivedMessageExpiry(expireCache, reduceSequence) { + return function(allDone) { + var lastPresence = null; + var handleMessage = this.connection.handleMessage; + this.connection.handleMessage = function(message) { + if (message.a === 'p' && message.src) { + lastPresence = JSON.parse(JSON.stringify(message)); + } + return handleMessage.apply(this, arguments); + }; + if (expireCache) { + this.doc.receivedPresenceTimeout = 0; + } + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + function(done) { + this.doc2.requestReplyPresence = false; + this.doc2.submitPresence(p(0), done); + }.bind(this), + async.nextTick, // wait for presence to reach this.doc + this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), // forces processing of all received presence + async.nextTick, // wait for op to reach this.doc + function(done) { + expect(this.doc.data).to.eql([ 'a', 'b' ]); + expect(this.doc.presence[this.connection2.id]).to.eql(p(0)); + // Replay the `lastPresence` with modified payload. + lastPresence.p = p(1); + lastPresence.v++; // +1 to account for the op above + if (reduceSequence) { + lastPresence.seq--; + } + this.connection.handleMessage(lastPresence); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.presence[this.connection2.id]).to.eql(expireCache ? p(1) : p(0)); + process.nextTick(done); + }.bind(this) + ], allDone); + }; + } + + it('ignores an old message (cache not expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(false, false)); + it('ignores an old message (cache not expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(false, true)); + it('processes an old message (cache expired, presence.seq === cachedPresence.seq)', testReceivedMessageExpiry(true, false)); + it('processes an old message (cache expired, presence.seq < cachedPresence.seq)', testReceivedMessageExpiry(true, true)); + }); +}); diff --git a/test/util.js b/test/util.js index 508f81a00..dfbfc0b8f 100644 --- a/test/util.js +++ b/test/util.js @@ -14,3 +14,9 @@ exports.pluck = function(docs, key) { } return values; }; + +exports.errorHandler = function(callback) { + return function(err) { + if (err) callback(err); + }; +}; From 33c72644521c82bf528344b9b6f95bb6debd26d4 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Fri, 27 Apr 2018 12:53:46 +0100 Subject: [PATCH 10/23] Execute some callbacks asynchronously --- lib/agent.js | 8 ++++++-- lib/client/doc.js | 18 ++++++++++++------ 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/lib/agent.js b/lib/agent.js index f04baa2bd..ac9c12d70 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -606,11 +606,15 @@ Agent.prototype._createOp = function(request) { Agent.prototype._presence = function(presence, callback) { if (presence.seq <= this.maxPresenceSeq) { - return callback(new ShareDBError(4026, 'Presence data superseded')); + return process.nextTick(function() { + callback(new ShareDBError(4026, 'Presence data superseded')); + }); } this.maxPresenceSeq = presence.seq; if (!this.subscribedDocs[presence.c] || !this.subscribedDocs[presence.c][presence.d]) { - return callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d)); + return process.nextTick(function() { + callback(new ShareDBError(4025, 'Cannot send presence. Not subscribed to document: ' + presence.c + ' ' + presence.d)); + }); } this.backend.sendPresence(presence, function(err) { if (err) return callback(err); diff --git a/lib/client/doc.js b/lib/client/doc.js index e92c4b644..1900b2d68 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -1029,15 +1029,21 @@ function callEach(callbacks, err) { Doc.prototype.submitPresence = function (data, callback) { if (data != null) { if (!this.type) { - var err = new ShareDBError(4015, 'Cannot submit presence. Document has not been created. ' + this.collection + '.' + this.id); - if (callback) return callback(err); - return this.emit('error', err); + var doc = this; + return process.nextTick(function() { + var err = new ShareDBError(4015, 'Cannot submit presence. Document has not been created. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); } if (!this.type.createPresence || !this.type.transformPresence) { - var err = new ShareDBError(4024, 'Cannot submit presence. Document\'s type does not support presence. ' + this.collection + '.' + this.id); - if (callback) return callback(err); - return this.emit('error', err); + var doc = this; + return process.nextTick(function() { + var err = new ShareDBError(4024, 'Cannot submit presence. Document\'s type does not support presence. ' + doc.collection + '.' + doc.id); + if (callback) return callback(err); + doc.emit('error', err); + }); } data = this.type.createPresence(data); From 8ff4b3335f3055ae3d3cefa7ee1773a3656919ba Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Mon, 30 Apr 2018 11:42:50 +0100 Subject: [PATCH 11/23] Don't send presence unnecessarily --- lib/client/doc.js | 14 ++++++++--- test/client/presence.js | 55 ++++++++++++++++++++++++++++++++++++++++- 2 files changed, 65 insertions(+), 4 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 1900b2d68..62d8e17fa 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -1049,9 +1049,17 @@ Doc.prototype.submitPresence = function (data, callback) { data = this.type.createPresence(data); } - if (!this.pendingPresence) this.pendingPresence = []; - if (callback) this.pendingPresence.push(callback); - this._setPresence('', data, true); + if (this._setPresence('', data, true) || this.pendingPresence || this.inflightPresence) { + if (!this.pendingPresence) { + this.pendingPresence = []; + } + if (callback) { + this.pendingPresence.push(callback); + } + + } else if (callback) { + process.nextTick(callback); + } var doc = this; process.nextTick(function() { diff --git a/test/client/presence.js b/test/client/presence.js index 271b9b063..90371cf6d 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -1060,7 +1060,7 @@ types.register(presenceType.type3); function(done) { this.doc.on('error', done); this.connection.seq--; - this.doc.submitPresence(p(0), function(err) { + this.doc.submitPresence(p(1), function(err) { expect(err).to.be.an(Error); expect(err.code).to.equal(4026); done(); @@ -1082,7 +1082,60 @@ types.register(presenceType.type3); done(); }.bind(this)); this.connection.seq--; + this.doc.submitPresence(p(1)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + async.nextTick, + function(done) { + this.doc.on('error', done); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; + this.doc.submitPresence(p(0), function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + } else { + expect(err).to.not.be.ok(); + } + done(); + }.bind(this)); + }.bind(this) + ], allDone); + }); + + it('does not publish presence unnecessarily when no callback is provided', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'c' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc.submitPresence.bind(this.doc, p(0)), + async.nextTick, + function(done) { + this.doc.on('error', function(err) { + if (typeName === 'wrapped-presence-no-compare') { + // The OT type does not support comparing presence. + expect(err).to.be.an(Error); + expect(err.code).to.equal(4026); + done(); + } else { + done(err); + } + }.bind(this)); + // Decremented sequence number would cause the server to return an error, however, + // the message won't be sent to the server at all because the presence data has not changed. + this.connection.seq--; this.doc.submitPresence(p(0)); + if (typeName !== 'wrapped-presence-no-compare') { + process.nextTick(done); + } }.bind(this) ], allDone); }); From 0ff380dda1c6263a31bd3878e73283424877a36e Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Mon, 30 Apr 2018 12:06:54 +0100 Subject: [PATCH 12/23] Re-sync presence after re-subscribe and re-connect --- lib/client/doc.js | 2 ++ test/client/presence.js | 51 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+) diff --git a/lib/client/doc.js b/lib/client/doc.js index 62d8e17fa..fe689ef2b 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -1242,6 +1242,8 @@ Doc.prototype._pausePresence = function() { this.inflightPresence; this.inflightPresence = null; this.inflightPresenceSeq = 0; + } else if (!this.pendingPresence && this.presence[''] != null) { + this.pendingPresence = []; } this.receivedPresence = Object.create(null); this.requestReplyPresence = true; diff --git a/test/client/presence.js b/test/client/presence.js index 90371cf6d..ea9c14caf 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -869,6 +869,57 @@ types.register(presenceType.type3); ], allDone); }); + it('re-synchronizes presence after reconnecting', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + this.connection.close(); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + this.backend.connect(this.connection); + process.nextTick(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + + it('re-synchronizes presence after resubscribing', function(allDone) { + async.series([ + this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + this.doc2.subscribe.bind(this.doc2), + this.doc.submitPresence.bind(this.doc, p(0)), + this.doc2.submitPresence.bind(this.doc2, p(1)), + async.nextTick, + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + this.doc.unsubscribe(errorHandler(done)); + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + this.doc.subscribe(done); + }.bind(this), + setTimeout, // wait for re-sync + function(done) { + expect(this.doc.presence['']).to.eql(p(0)); + expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); + process.nextTick(done); + }.bind(this) + ], allDone); + }); + it('transforms received presence against inflight and pending ops (presence.index < op.index)', function(allDone) { async.series([ this.doc.create.bind(this.doc, [ 'a' ], typeName), From d67dd6a777661fad628ff84fa869f2a96d6fe6b9 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Tue, 1 May 2018 14:35:24 +0100 Subject: [PATCH 13/23] Emit presence asynchronously --- lib/client/doc.js | 5 +- test/client/presence.js | 130 +++++++++++++++++++++------------------- 2 files changed, 71 insertions(+), 64 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index fe689ef2b..f84ce65e1 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -1277,7 +1277,10 @@ Doc.prototype._setPresence = function(src, data, emit) { Doc.prototype._emitPresence = function(srcList) { if (srcList && srcList.length > 0) { - this.emit('presence', srcList); + var doc = this; + process.nextTick(function() { + doc.emit('presence', srcList); + }); } }; diff --git a/test/client/presence.js b/test/client/presence.js index ea9c14caf..b5cec497a 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -295,7 +295,7 @@ types.register(presenceType.type3); this.doc.version = 1; this.doc.data = [ 'a' ]; this.doc.requestReplyPresence = false; - this.doc.submitPresence(p(0), errorHandler(done)); + this.doc.submitPresence(p(1), errorHandler(done)); }.bind(this) ], allDone); }); @@ -307,7 +307,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(0)), - async.nextTick, // wait for the doc2 presence message to reach doc + setTimeout, function(done) { this.doc.on('presence', function(srcList) { expect(srcList.sort()).to.eql([ '', this.connection2.id ]); @@ -327,7 +327,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(0)), - async.nextTick, // wait for the doc2 presence message to reach doc + setTimeout, function(done) { this.doc.on('presence', function(srcList) { expect(srcList.sort()).to.eql([ '', this.connection2.id ]); @@ -347,7 +347,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(2)), - async.nextTick, // wait for the doc2 presence message to reach doc + setTimeout, function(done) { this.doc.on('presence', function(srcList) { expect(srcList).to.eql([ this.connection2.id ]); @@ -367,7 +367,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(2)), - async.nextTick, // wait for the doc2 presence message to reach doc + setTimeout, function(done) { this.doc.on('presence', function(srcList) { expect(srcList).to.eql([ this.connection2.id ]); @@ -387,7 +387,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(1)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, // wait for the doc2 presence message to reach doc + setTimeout, function(done) { this.doc.on('presence', function(srcList) { expect(srcList).to.eql([ '' ]); @@ -407,7 +407,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(1)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, // wait for the doc2 presence message to reach doc + setTimeout, function(done) { this.doc.on('presence', function(srcList) { expect(srcList).to.eql([ this.connection2.id ]); @@ -443,7 +443,7 @@ types.register(presenceType.type3); this.doc.create.bind(this.doc, [ 'a' ], typeName), this.doc.submitOp.bind(this.doc, op), this.doc.del.bind(this.doc), - async.nextTick, + setTimeout, function(done) { expect(this.doc2.cachedOps.length).to.equal(3); expect(this.doc2.cachedOps[0].create).to.equal(true); @@ -698,7 +698,7 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), - async.nextTick, + setTimeout, function(done) { expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); this.doc2.on('presence', function(srcList) { @@ -720,7 +720,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, + setTimeout, function(done) { expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); @@ -750,7 +750,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, + setTimeout, function(done) { expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); @@ -780,7 +780,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, + setTimeout, function(done) { expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); @@ -806,7 +806,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, + setTimeout, function(done) { expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); @@ -876,7 +876,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, + setTimeout, function(done) { expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); @@ -902,7 +902,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(1)), - async.nextTick, + setTimeout, function(done) { expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); @@ -983,7 +983,7 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(1)), - async.nextTick, + setTimeout, function(done) { this.doc2.on('presence', function(srcList) { expect(srcList).to.eql([ this.connection.id ]); @@ -1004,7 +1004,7 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(1)), - async.nextTick, + setTimeout, function(done) { var firstCall = true; this.doc2.on('presence', function(srcList) { @@ -1048,7 +1048,7 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(1)), - async.nextTick, + setTimeout, function(done) { this.doc2.on('presence', function(srcList) { if (typeName === 'wrapped-presence-no-compare') { @@ -1107,7 +1107,7 @@ types.register(presenceType.type3); this.doc.create.bind(this.doc, [ 'c' ], typeName), this.doc.subscribe.bind(this.doc), this.doc.submitPresence.bind(this.doc, p(0)), - async.nextTick, + setTimeout, function(done) { this.doc.on('error', done); this.connection.seq--; @@ -1125,7 +1125,7 @@ types.register(presenceType.type3); this.doc.create.bind(this.doc, [ 'c' ], typeName), this.doc.subscribe.bind(this.doc), this.doc.submitPresence.bind(this.doc, p(0)), - async.nextTick, + setTimeout, function(done) { this.doc.on('error', function(err) { expect(err).to.be.an(Error); @@ -1143,7 +1143,7 @@ types.register(presenceType.type3); this.doc.create.bind(this.doc, [ 'c' ], typeName), this.doc.subscribe.bind(this.doc), this.doc.submitPresence.bind(this.doc, p(0)), - async.nextTick, + setTimeout, function(done) { this.doc.on('error', done); // Decremented sequence number would cause the server to return an error, however, @@ -1168,7 +1168,7 @@ types.register(presenceType.type3); this.doc.create.bind(this.doc, [ 'c' ], typeName), this.doc.subscribe.bind(this.doc), this.doc.submitPresence.bind(this.doc, p(0)), - async.nextTick, + setTimeout, function(done) { this.doc.on('error', function(err) { if (typeName === 'wrapped-presence-no-compare') { @@ -1195,7 +1195,7 @@ types.register(presenceType.type3); async.series([ this.doc.create.bind(this.doc, [ 'c' ], typeName), this.doc.subscribe.bind(this.doc), - async.nextTick, + setTimeout, function(done) { var sendPresence = this.backend.sendPresence; this.backend.sendPresence = function(presence, callback) { @@ -1218,7 +1218,7 @@ types.register(presenceType.type3); async.series([ this.doc.create.bind(this.doc, [ 'c' ], typeName), this.doc.subscribe.bind(this.doc), - async.nextTick, + setTimeout, function(done) { var sendPresence = this.backend.sendPresence; this.backend.sendPresence = function(presence, callback) { @@ -1244,7 +1244,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(0)), - async.nextTick, + setTimeout, function(done) { // A hack to allow testing of hard rollback of both inflight and pending presence. var doc = this.doc; @@ -1254,30 +1254,31 @@ types.register(presenceType.type3); _handlePresence.call(doc, err, presence); }); }; + process.nextTick(done); + }.bind(this), + this.doc.submitPresence.bind(this.doc, p(1)), // inflightPresence + process.nextTick, // wait for "presence" event + this.doc.submitPresence.bind(this.doc, p(2)), // pendingPresence + process.nextTick, // wait for "presence" event + function(done) { + var presenceEmitted = false; + this.doc.on('presence', function(srcList) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + }.bind(this)); - this.doc.submitPresence(p(1)); // inflightPresence - process.nextTick(function() { - this.doc.submitPresence(p(2)); // pendingPresence - - var presenceEmitted = false; - this.doc.on('presence', function(srcList) { - expect(presenceEmitted).to.equal(false); - presenceEmitted = true; - expect(srcList.sort()).to.eql([ '', this.connection2.id ]); - expect(this.doc.presence).to.not.have.key(''); - expect(this.doc.presence).to.not.have.key(this.connection2.id); - }.bind(this)); - - this.doc.on('error', function(err) { - expect(presenceEmitted).to.equal(true); - expect(err).to.be.an(Error); - expect(err.code).to.equal(4000); - done(); - }.bind(this)); - - // send an invalid op - this.doc._submit({ index: 3, value: 'b' }, true); + this.doc.on('error', function(err) { + expect(presenceEmitted).to.equal(true); + expect(err).to.be.an(Error); + expect(err.code).to.equal(4000); + done(); }.bind(this)); + + // send an invalid op + this.doc._submit({}, true); }.bind(this) ], allDone); }); @@ -1289,7 +1290,7 @@ types.register(presenceType.type3); this.doc2.subscribe.bind(this.doc2), this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.submitPresence.bind(this.doc2, p(0)), - async.nextTick, + setTimeout, function(done) { // A hack to allow testing of hard rollback of both inflight and pending presence. var doc = this.doc; @@ -1299,7 +1300,9 @@ types.register(presenceType.type3); _handlePresence.call(doc, err, presence); }); }; - + process.nextTick(done); + }.bind(this), + function(done) { var presenceEmitted = false; var called = 0; function callback(err) { @@ -1310,20 +1313,21 @@ types.register(presenceType.type3); done(); } this.doc.submitPresence(p(1), callback); // inflightPresence - process.nextTick(function() { + process.nextTick(function() { // wait for presence event this.doc.submitPresence(p(2), callback); // pendingPresence - - this.doc.on('presence', function(srcList) { - expect(presenceEmitted).to.equal(false); - presenceEmitted = true; - expect(srcList.sort()).to.eql([ '', this.connection2.id ]); - expect(this.doc.presence).to.not.have.key(''); - expect(this.doc.presence).to.not.have.key(this.connection2.id); + process.nextTick(function() { // wait for presence event + this.doc.on('presence', function(srcList) { + expect(presenceEmitted).to.equal(false); + presenceEmitted = true; + expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(this.doc.presence).to.not.have.key(''); + expect(this.doc.presence).to.not.have.key(this.connection2.id); + }.bind(this)); + this.doc.on('error', done); + + // send an invalid op + this.doc._submit({ index: 3, value: 'b' }, true, callback); }.bind(this)); - this.doc.on('error', done); - - // send an invalid op - this.doc._submit({ index: 3, value: 'b' }, true, callback); }.bind(this)); }.bind(this) ], allDone); @@ -1350,9 +1354,9 @@ types.register(presenceType.type3); this.doc2.requestReplyPresence = false; this.doc2.submitPresence(p(0), done); }.bind(this), - async.nextTick, // wait for presence to reach this.doc + setTimeout, this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), // forces processing of all received presence - async.nextTick, // wait for op to reach this.doc + setTimeout, function(done) { expect(this.doc.data).to.eql([ 'a', 'b' ]); expect(this.doc.presence[this.connection2.id]).to.eql(p(0)); From e8ec2158a46c6cc9c3e1f90c5909723e1acb5580 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Wed, 9 May 2018 12:53:18 +0100 Subject: [PATCH 14/23] Add `submitted` param to `presence` event --- README.md | 4 +- lib/client/doc.js | 14 ++--- test/client/presence.js | 130 +++++++++++++++++++++++++--------------- 3 files changed, 92 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 3cbdea6e8..2c6dc1293 100644 --- a/README.md +++ b/README.md @@ -258,8 +258,8 @@ An operation was applied to the data. `source` will be `false` for ops received `doc.on('del', function(data, source) {...})` The document was deleted. Document contents before deletion are passed in as an argument. `source` will be `false` for ops received from the server and defaults to `true` for ops generated locally. -`doc.on('presence', function(srcList) {...})` -Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. +`doc.on('presence', function(srcList, submitted) {...})` +Presence data has changed. `srcList` is an Array of `doc.presence` property names for which values have changed. `submitted` is `true`, if the event is the result of new presence data being submitted by the local or remote user, otherwise it is `false` - eg if the presence data was transformed against an operation or was cleared on unsubscribe, disconnect or roll-back. `doc.on('error', function(err) {...})` There was an error fetching the document or applying an operation. diff --git a/lib/client/doc.js b/lib/client/doc.js index f84ce65e1..fa35b4926 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -991,7 +991,7 @@ Doc.prototype._hardRollback = function(err) { changedSrcList.push(src); } } - this._emitPresence(changedSrcList); + this._emitPresence(changedSrcList, false); // Fetch the latest from the server to get us back into a working state var doc = this; @@ -1208,7 +1208,7 @@ Doc.prototype._processAllReceivedPresence = function() { changedSrcList.push(src); } } - this._emitPresence(changedSrcList); + this._emitPresence(changedSrcList, true); }; Doc.prototype._transformPresence = function(src, op) { @@ -1231,7 +1231,7 @@ Doc.prototype._transformAllPresence = function(op) { changedSrcList.push(src); } } - this._emitPresence(changedSrcList); + this._emitPresence(changedSrcList, false); }; Doc.prototype._pausePresence = function() { @@ -1255,7 +1255,7 @@ Doc.prototype._pausePresence = function() { changedSrcList.push(src); } } - this._emitPresence(changedSrcList); + this._emitPresence(changedSrcList, false); }; // If emit is true and presence has changed, emits a presence event. @@ -1271,15 +1271,15 @@ Doc.prototype._setPresence = function(src, data, emit) { if (isPresenceEqual) return false; this.presence[src] = data; } - if (emit) this._emitPresence([ src ]); + if (emit) this._emitPresence([ src ], true); return true; }; -Doc.prototype._emitPresence = function(srcList) { +Doc.prototype._emitPresence = function(srcList, submitted) { if (srcList && srcList.length > 0) { var doc = this; process.nextTick(function() { - doc.emit('presence', srcList); + doc.emit('presence', srcList, submitted); }); } }; diff --git a/test/client/presence.js b/test/client/presence.js index b5cec497a..3ddc86bff 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -40,8 +40,9 @@ types.register(presenceType.type3); function(done) { this.doc.requestReplyPresence = false; this.doc.submitPresence(p(1), errorHandler(done)); - this.doc2.once('presence', function(srcList) { + this.doc2.once('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([]); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); @@ -60,8 +61,9 @@ types.register(presenceType.type3); this.doc.submitOp({ index: 1, value: 'b' }, errorHandler(done)); this.doc.requestReplyPresence = false; this.doc.submitPresence(p(1), errorHandler(done)); - this.doc2.once('presence', function(srcList) { + this.doc2.once('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); @@ -76,8 +78,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); @@ -103,8 +106,9 @@ types.register(presenceType.type3); this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); done(); @@ -126,8 +130,9 @@ types.register(presenceType.type3); this.doc.submitOp.bind(this.doc, { index: 1, value: 'c' }), this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); done(); @@ -149,8 +154,9 @@ types.register(presenceType.type3); this.doc.submitOp.bind(this.doc, { index: 0, value: 'b' }), this.doc.submitOp.bind(this.doc, { index: 0, value: 'a' }), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); done(); @@ -172,8 +178,9 @@ types.register(presenceType.type3); this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), this.doc2.submitOp.bind(this.doc2, { index: 2, value: 'c' }), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); done(); @@ -195,8 +202,9 @@ types.register(presenceType.type3); this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'c' }), this.doc2.submitOp.bind(this.doc2, { index: 1, value: 'b' }), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); @@ -218,8 +226,9 @@ types.register(presenceType.type3); this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'b' }), this.doc2.submitOp.bind(this.doc2, { index: 0, value: 'a' }), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); done(); @@ -242,8 +251,9 @@ types.register(presenceType.type3); this.doc.del.bind(this.doc), this.doc.create.bind(this.doc, [ 'b' ], typeName), function(done) { - this.doc2.once('presence', function(srcList) { + this.doc2.once('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'b' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); done(); @@ -252,8 +262,9 @@ types.register(presenceType.type3); this.doc.submitPresence(p(0), errorHandler(done)); }.bind(this), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'b' ]); expect(this.doc2.presence).to.not.have.key(this.connection.id); done(); @@ -274,8 +285,9 @@ types.register(presenceType.type3); this.doc.submitOp.bind(this.doc, { index: 1, value: 'b' }), this.doc.submitOp.bind(this.doc, { index: 2, value: 'c' }), function(done) { - this.doc2.once('presence', function(srcList) { + this.doc2.once('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); done(); @@ -285,8 +297,9 @@ types.register(presenceType.type3); }.bind(this), function(done) { this.doc2.cachedOps = []; - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.data).to.eql([ 'a', 'b', 'c' ]); expect(this.doc2.presence).to.not.have.key(this.connection.id); done(); @@ -309,8 +322,9 @@ types.register(presenceType.type3); this.doc2.submitPresence.bind(this.doc2, p(0)), setTimeout, function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence).to.not.have.key(''); expect(this.doc.presence).to.not.have.key(this.connection2.id); done(); @@ -329,8 +343,9 @@ types.register(presenceType.type3); this.doc2.submitPresence.bind(this.doc2, p(0)), setTimeout, function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence).to.not.have.key(''); expect(this.doc.presence).to.not.have.key(this.connection2.id); done(); @@ -349,8 +364,9 @@ types.register(presenceType.type3); this.doc2.submitPresence.bind(this.doc2, p(2)), setTimeout, function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(3)); done(); @@ -369,8 +385,9 @@ types.register(presenceType.type3); this.doc2.submitPresence.bind(this.doc2, p(2)), setTimeout, function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence['']).to.eql(p(0)); expect(this.doc.presence[this.connection2.id]).to.eql(p(3)); done(); @@ -389,8 +406,9 @@ types.register(presenceType.type3); this.doc2.submitPresence.bind(this.doc2, p(1)), setTimeout, function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(false); expect(this.doc.presence['']).to.eql(p(2)); expect(this.doc.presence[this.connection2.id]).to.eql(p(1)); done(); @@ -409,8 +427,9 @@ types.register(presenceType.type3); this.doc2.submitPresence.bind(this.doc2, p(1)), setTimeout, function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence['']).to.eql(p(1)); expect(this.doc.presence[this.connection2.id]).to.eql(p(2)); done(); @@ -508,9 +527,10 @@ types.register(presenceType.type3); this.doc.submitPresence.bind(this.doc, p(0)), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { if (srcList[0] === '') { expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); expect(this.doc2.presence['']).to.eql(p(1)); expect(this.doc2.presence).to.not.have.key(this.connection.id); } else { @@ -595,8 +615,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(2)); done(); }.bind(this)); @@ -613,8 +634,9 @@ types.register(presenceType.type3); this.doc.create.bind(this.doc, [ 'a' ], typeName), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); }.bind(this)); @@ -636,8 +658,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); }.bind(this)); @@ -657,8 +680,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); done(); }.bind(this)); @@ -701,13 +725,16 @@ types.register(presenceType.type3); setTimeout, function(done) { expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); expect(this.doc2.presence).to.not.have.key(this.connection.id); done(); }.bind(this)); this.doc.requestReplyPresence = false; - this.doc.submitPresence(p(0), errorHandler(done)); + this.doc.submitPresence(p(1), errorHandler(done)); this.doc2.del(errorHandler(done)); }.bind(this) ], allDone); @@ -728,17 +755,14 @@ types.register(presenceType.type3); expect(this.doc2.presence['']).to.eql(p(1)); var connectionId = this.connection.id; - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); expect(this.doc2.presence).to.not.have.key(connectionId); expect(this.doc2.presence['']).to.eql(p(1)); done(); }.bind(this)); this.connection.close(); - - // this.doc.requestReplyPresence = false; - // this.doc.submitPresence(p(0), errorHandler(done)); - // this.doc2.del(errorHandler(done)); }.bind(this) ], allDone); }); @@ -758,17 +782,14 @@ types.register(presenceType.type3); expect(this.doc2.presence['']).to.eql(p(1)); var connectionId = this.connection.id; - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); expect(this.doc2.presence).to.not.have.key(connectionId); expect(this.doc2.presence['']).to.eql(p(1)); done(); }.bind(this)); this.connection2.close(); - - // this.doc.requestReplyPresence = false; - // this.doc.submitPresence(p(0), errorHandler(done)); - // this.doc2.del(errorHandler(done)); }.bind(this) ], allDone); }); @@ -788,8 +809,9 @@ types.register(presenceType.type3); expect(this.doc2.presence['']).to.eql(p(1)); var connectionId = this.connection.id; - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(true); expect(this.doc2.presence).to.not.have.key(connectionId); expect(this.doc2.presence['']).to.eql(p(1)); done(); @@ -814,8 +836,9 @@ types.register(presenceType.type3); expect(this.doc2.presence['']).to.eql(p(1)); var connectionId = this.connection.id; - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ connectionId ]); + expect(submitted).to.equal(false); expect(this.doc2.presence).to.not.have.key(connectionId); expect(this.doc2.presence['']).to.eql(p(1)); done(); @@ -926,8 +949,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(0)); done(); }.bind(this)); @@ -945,8 +969,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); }.bind(this)); @@ -964,8 +989,9 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc2.subscribe.bind(this.doc2), function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(3)); done(); }.bind(this)); @@ -985,8 +1011,11 @@ types.register(presenceType.type3); this.doc.submitPresence.bind(this.doc, p(1)), setTimeout, function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); expect(this.doc2.presence).to.not.have.key(this.connection.id); done(); }.bind(this)); @@ -1007,9 +1036,12 @@ types.register(presenceType.type3); setTimeout, function(done) { var firstCall = true; - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { if (firstCall) return firstCall = false; expect(srcList).to.eql([ this.connection.id ]); + // The call to `del` transforms the presence and fires the event. + // The call to `submitPresence` does not fire the event because presence is already null. + expect(submitted).to.equal(false); expect(this.doc2.presence).to.not.have.key(this.connection.id); done(); }.bind(this)); @@ -1028,9 +1060,10 @@ types.register(presenceType.type3); this.doc.subscribe.bind(this.doc), this.doc.submitPresence.bind(this.doc, p(1)), function(done) { - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { if (typeName === 'wrapped-presence-no-compare') { expect(srcList).to.eql([ '' ]); + expect(submitted).to.equal(true); expect(this.doc.presence['']).to.eql(p(1)); done(); } else { @@ -1050,9 +1083,10 @@ types.register(presenceType.type3); this.doc.submitPresence.bind(this.doc, p(1)), setTimeout, function(done) { - this.doc2.on('presence', function(srcList) { + this.doc2.on('presence', function(srcList, submitted) { if (typeName === 'wrapped-presence-no-compare') { expect(srcList).to.eql([ this.connection.id ]); + expect(submitted).to.equal(true); expect(this.doc2.presence[this.connection.id]).to.eql(p(1)); done(); } else { @@ -1262,10 +1296,11 @@ types.register(presenceType.type3); process.nextTick, // wait for "presence" event function(done) { var presenceEmitted = false; - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(presenceEmitted).to.equal(false); presenceEmitted = true; expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence).to.not.have.key(''); expect(this.doc.presence).to.not.have.key(this.connection2.id); }.bind(this)); @@ -1316,10 +1351,11 @@ types.register(presenceType.type3); process.nextTick(function() { // wait for presence event this.doc.submitPresence(p(2), callback); // pendingPresence process.nextTick(function() { // wait for presence event - this.doc.on('presence', function(srcList) { + this.doc.on('presence', function(srcList, submitted) { expect(presenceEmitted).to.equal(false); presenceEmitted = true; expect(srcList.sort()).to.eql([ '', this.connection2.id ]); + expect(submitted).to.equal(false); expect(this.doc.presence).to.not.have.key(''); expect(this.doc.presence).to.not.have.key(this.connection2.id); }.bind(this)); From 173bf3a58379e61edaacb75ccfa14b69bb5d55af Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Wed, 13 Jun 2018 23:44:42 +0100 Subject: [PATCH 15/23] Use the correct variable The issue could not cause problems in practice because ot-json0 does not support presence. --- lib/client/doc.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index fa35b4926..039adcf7f 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -616,7 +616,7 @@ Doc.prototype._otApply = function(op, source) { // Apply the individual op component this.emit('before op', componentOp.op, source); this.data = this.type.apply(this.data, componentOp.op); - this._transformAllPresence(op); + this._transformAllPresence(componentOp); this.emit('op', componentOp.op, source); } // Pop whatever was submitted since we started applying this op From 054d34d90e870277372df80293bff03aeb820cd3 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Thu, 21 Jun 2018 11:43:00 +0100 Subject: [PATCH 16/23] Small test update --- test/client/presence.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/client/presence.js b/test/client/presence.js index 3ddc86bff..d74532a49 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -1313,7 +1313,7 @@ types.register(presenceType.type3); }.bind(this)); // send an invalid op - this.doc._submit({}, true); + this.doc._submit({}, null); }.bind(this) ], allDone); }); @@ -1362,7 +1362,7 @@ types.register(presenceType.type3); this.doc.on('error', done); // send an invalid op - this.doc._submit({ index: 3, value: 'b' }, true, callback); + this.doc._submit({ index: 3, value: 'b' }, null, callback); }.bind(this)); }.bind(this)); }.bind(this) From 15cdd1d3d451a2b5dcbdadd9fb551c912975738d Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Thu, 12 Jul 2018 12:24:25 +0200 Subject: [PATCH 17/23] Update tested nodejs versions --- .travis.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 5c87e1e6d..21efafe46 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: node_js node_js: - "10" - - "9" - "8" - "6" script: "npm run jshint && npm run test-cover" From cfca37ff92ecfe0cfba0de985e3651d4f3a51ec9 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Thu, 12 Jul 2018 12:38:21 +0200 Subject: [PATCH 18/23] Make destroy wait for unsubscribe --- lib/client/doc.js | 11 ++++++++--- test/client/subscribe.js | 13 ++++++++++++- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 33621cb9c..796df6bd7 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -105,10 +105,15 @@ Doc.prototype.destroy = function(callback) { var doc = this; doc.whenNothingPending(function() { if (doc.wantSubscribe) { - doc.unsubscribe(); + doc.unsubscribe(function(err) { + if (!err) doc.connection._destroyDoc(doc); + if (callback) return callback(err); + if (err) this.emit('error', err); + }); + } else { + doc.connection._destroyDoc(doc); + if (callback) callback(); } - doc.connection._destroyDoc(doc); - if (callback) callback(); }); }; diff --git a/test/client/subscribe.js b/test/client/subscribe.js index db2bea1b2..b24a94749 100644 --- a/test/client/subscribe.js +++ b/test/client/subscribe.js @@ -414,7 +414,7 @@ describe('client subscribe', function() { doc2.subscribe(function(err) { if (err) return done(err); doc2.on('op', function(op, context) { - done(); + done(new Error('Should not get op event')); }); doc2.destroy(function(err) { if (err) return done(err); @@ -425,6 +425,17 @@ describe('client subscribe', function() { }); }); + it('doc destroy removes doc from connection when doc is not subscribed', function(done) { + var connection = this.backend.connect(); + var doc = connection.get('dogs', 'fido'); + expect(connection.getExisting('dogs', 'fido')).equal(doc); + doc.destroy(function(err) { + if (err) return done(err); + expect(connection.getExisting('dogs', 'fido')).equal(undefined); + done(); + }); + }); + it('bulk unsubscribe stops op updates', function(done) { var connection = this.backend.connect(); var connection2 = this.backend.connect(); From 5e009d17d0b6b665eb25abc0d538d1bc67e5dc6b Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Thu, 12 Jul 2018 12:52:59 +0200 Subject: [PATCH 19/23] Simplify the code --- lib/client/doc.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 796df6bd7..d75e83085 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -106,9 +106,13 @@ Doc.prototype.destroy = function(callback) { doc.whenNothingPending(function() { if (doc.wantSubscribe) { doc.unsubscribe(function(err) { - if (!err) doc.connection._destroyDoc(doc); - if (callback) return callback(err); - if (err) this.emit('error', err); + if (err) { + if (callback) callback(err); + else this.emit('error', err); + return; + } + doc.connection._destroyDoc(doc); + if (callback) callback(); }); } else { doc.connection._destroyDoc(doc); From 56b726bd0f97fee321ecb363a993ee0e1700f65f Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Thu, 12 Jul 2018 14:30:09 +0200 Subject: [PATCH 20/23] Make hasPending depend on inflightPresence and pendingPresence --- lib/client/doc.js | 5 ++++- test/client/presence.js | 42 ++++++++++++++++++++++++++++++++--------- 2 files changed, 37 insertions(+), 10 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index 180f821d7..b07554353 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -260,7 +260,9 @@ Doc.prototype.hasPending = function() { this.inflightFetch.length || this.inflightSubscribe.length || this.inflightUnsubscribe.length || - this.pendingFetch.length + this.pendingFetch.length || + this.inflightPresence || + this.pendingPresence ); }; @@ -1077,6 +1079,7 @@ Doc.prototype._handlePresence = function(err, presence) { var called = callbacks && callEach(callbacks, err); if (err && !called) this.emit('error', err); this.flush(); + this._emitNothingPending(); } return; } diff --git a/test/client/presence.js b/test/client/presence.js index ca07c9b8b..4f9a3d31e 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -692,26 +692,50 @@ types.register(presenceType.type3); ], allDone); }); - it.skip('cancels pending presence on destroy', function(allDone) { + it('hasPending is true, if there is pending presence', function(allDone) { async.series([ this.doc.create.bind(this.doc, [ 'a' ], typeName), + this.doc.subscribe.bind(this.doc), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + this.doc.submitPresence(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.pendingPresence).to.equal(true); + expect(!!this.doc.inflightPresence).to.equal(false); + this.doc.whenNothingPending(done); + }.bind(this), function(done) { - this.doc.submitPresence(p(0), done); - console.log(!!this.doc.inflightPresence, !!this.doc.pendingPresence); - this.doc.destroy(errorHandler(done)); + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.pendingPresence).to.equal(false); + expect(!!this.doc.inflightPresence).to.equal(false); + done(); }.bind(this) ], allDone); }); - it.skip('cancels inflight presence on destroy', function(allDone) { + it('hasPending is true, if there is inflight presence', function(allDone) { async.series([ this.doc.create.bind(this.doc, [ 'a' ], typeName), this.doc.subscribe.bind(this.doc), function(done) { - this.doc.submitPresence(p(0), done); - process.nextTick(function() { - this.doc.destroy(errorHandler(done)); - }.bind(this)); + expect(this.doc.hasPending()).to.equal(false); + this.doc.submitPresence(p(0)); + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.pendingPresence).to.equal(true); + expect(!!this.doc.inflightPresence).to.equal(false); + process.nextTick(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(true); + expect(!!this.doc.pendingPresence).to.equal(false); + expect(!!this.doc.inflightPresence).to.equal(true); + this.doc.whenNothingPending(done); + }.bind(this), + function(done) { + expect(this.doc.hasPending()).to.equal(false); + expect(!!this.doc.pendingPresence).to.equal(false); + expect(!!this.doc.inflightPresence).to.equal(false); + done(); }.bind(this) ], allDone); }); From 762496aae09e90468c51fe6f1b88dddfb4cbde1b Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Tue, 10 Jul 2018 11:30:02 +0200 Subject: [PATCH 21/23] Remove cached ops without using setTimeout See https://github.com/share/sharedb/issues/219 --- lib/client/doc.js | 20 ++++++++++++--- test/client/presence.js | 54 +++++++++++++++++++---------------------- 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/lib/client/doc.js b/lib/client/doc.js index b07554353..42d8d37dc 100644 --- a/lib/client/doc.js +++ b/lib/client/doc.js @@ -83,7 +83,7 @@ function Doc(connection, collection, id) { this.requestReplyPresence = true; // A list of ops sent by the server. These are needed for transforming presence data, // if we get that presence data for an older version of the document. - // The ops are cached for 1 minute by default, which should be lots, considering that the presence + // The ops are cached for at least 1 minute by default, which should be lots, considering that the presence // data is supposed to be synced in real-time. this.cachedOps = []; this.cachedOpsTimeout = 60000; @@ -362,6 +362,7 @@ Doc.prototype._handleOp = function(err, message) { var serverOp = { src: message.src, + time: Date.now(), create: !!message.create, op: message.op, del: !!message.del @@ -914,6 +915,7 @@ Doc.prototype._opAcknowledged = function(message) { this.version++; this._cacheOp({ src: this.inflightOp.src, + time: Date.now(), create: !!this.inflightOp.create, op: this.inflightOp.op, del: !!this.inflightOp.del @@ -1283,8 +1285,18 @@ Doc.prototype._emitPresence = function(srcList, submitted) { }; Doc.prototype._cacheOp = function(op) { + // Remove the old ops. + var oldOpTime = Date.now() - this.cachedOpsTimeout; + var i; + for (i = 0; i < this.cachedOps.length; i++) { + if (this.cachedOps[i].time >= oldOpTime) { + break; + } + } + if (i > 0) { + this.cachedOps.splice(0, i); + } + + // Cache the new op. this.cachedOps.push(op); - setTimeout(function() { - if (this.cachedOps[0] === op) this.cachedOps.shift(); - }.bind(this), this.cachedOpsTimeout); }; diff --git a/test/client/presence.js b/test/client/presence.js index 4f9a3d31e..2bf7e7dab 100644 --- a/test/client/presence.js +++ b/test/client/presence.js @@ -473,48 +473,44 @@ types.register(presenceType.type3); ], allDone); }); - it('removes cached ops', function(allDone) { - var op = { index: 1, value: 'b' }; - this.doc.cachedOpsTimeout = 0; + it('expires cached ops', function(allDone) { + var op1 = { index: 1, value: 'b' }; + var op2 = { index: 2, value: 'b' }; + var op3 = { index: 3, value: 'b' }; + this.doc.cachedOpsTimeout = 60; async.series([ + // Cache 2 ops. this.doc.create.bind(this.doc, [ 'a' ], typeName), - this.doc.submitOp.bind(this.doc, op), - this.doc.del.bind(this.doc), + this.doc.submitOp.bind(this.doc, op1), function(done) { - expect(this.doc.cachedOps.length).to.equal(3); + expect(this.doc.cachedOps.length).to.equal(2); expect(this.doc.cachedOps[0].create).to.equal(true); - expect(this.doc.cachedOps[1].op).to.equal(op); - expect(this.doc.cachedOps[2].del).to.equal(true); + expect(this.doc.cachedOps[1].op).to.equal(op1); done(); }.bind(this), - setTimeout, - function(done) { - expect(this.doc.cachedOps.length).to.equal(0); - done(); - }.bind(this) - ], allDone); - }); - it('removes correct cached ops', function(allDone) { - var op = { index: 1, value: 'b' }; - this.doc.cachedOpsTimeout = 0; - async.series([ - this.doc.create.bind(this.doc, [ 'a' ], typeName), - this.doc.submitOp.bind(this.doc, op), - this.doc.del.bind(this.doc), + // Cache another op before the first 2 expire. + function (callback) { + setTimeout(callback, 30); + }, + this.doc.submitOp.bind(this.doc, op2), function(done) { expect(this.doc.cachedOps.length).to.equal(3); expect(this.doc.cachedOps[0].create).to.equal(true); - expect(this.doc.cachedOps[1].op).to.equal(op); - expect(this.doc.cachedOps[2].del).to.equal(true); - this.doc.cachedOps.shift(); - this.doc.cachedOps.push({ op: true }); + expect(this.doc.cachedOps[1].op).to.equal(op1); + expect(this.doc.cachedOps[2].op).to.equal(op2); done(); }.bind(this), - setTimeout, + + // Cache another op after the first 2 expire. + function (callback) { + setTimeout(callback, 31); + }, + this.doc.submitOp.bind(this.doc, op3), function(done) { - expect(this.doc.cachedOps.length).to.equal(1); - expect(this.doc.cachedOps[0].op).to.equal(true); + expect(this.doc.cachedOps.length).to.equal(2); + expect(this.doc.cachedOps[0].op).to.equal(op2); + expect(this.doc.cachedOps[1].op).to.equal(op3); done(); }.bind(this) ], allDone); From e4c5e6d827656fe3781be702cdd6b8fc7f512a02 Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Fri, 20 Jul 2018 13:44:05 +0200 Subject: [PATCH 22/23] Remove --exit mocha option --- test/mocha.opts | 1 - 1 file changed, 1 deletion(-) diff --git a/test/mocha.opts b/test/mocha.opts index 7ca4707b0..34f904192 100644 --- a/test/mocha.opts +++ b/test/mocha.opts @@ -1,4 +1,3 @@ --reporter spec --check-leaks --recursive ---exit From 428c46a61b6ea5fdefb440a3740f677051d7ed8e Mon Sep 17 00:00:00 2001 From: Greg Kubisa Date: Fri, 20 Jul 2018 13:50:46 +0200 Subject: [PATCH 23/23] Workaround for circular dependency --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 21efafe46..736e5fe78 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,6 +3,6 @@ node_js: - "10" - "8" - "6" -script: "npm run jshint && npm run test-cover" +script: "ln -s .. node_modules/sharedb; npm run jshint && npm run test-cover" # Send coverage data to Coveralls after_script: "cat ./coverage/lcov.info | ./node_modules/coveralls/bin/coveralls.js"