From 95cbef438e22e8d2fb08bb832c51fb61e1169f10 Mon Sep 17 00:00:00 2001 From: Anatoly Tsapkov Date: Wed, 4 Mar 2020 23:12:31 -0500 Subject: [PATCH] watch for key value changes --- README.md | 11 +-- lib/context.js | 9 -- lib/memkv.js | 103 ++++++++++++++-------- lib/mqtt/gate.js | 18 ++-- lib/realm.js | 48 ++++------ lib/wamp/api.js | 22 ++++- lib/wamp/gate.js | 11 +-- package-lock.json | 2 +- package.json | 2 +- test/broker.js | 214 ++++++++++++++++++++++++++++++++------------- test/pub_worker.js | 20 ++--- 11 files changed, 291 insertions(+), 169 deletions(-) diff --git a/README.md b/README.md index 9224298..436d567 100644 --- a/README.md +++ b/README.md @@ -91,8 +91,7 @@ message for the subscription. And here what could be implemented ```javascript publish('the.key', [ 'args' ], { kwArgs: true }, { - retain: 100, - // weak: 'public', + retain: true, when: { status: 'started' }, watch: false will: { value: 'to', publish: 'at', session: 'disconnect' } @@ -100,11 +99,9 @@ publish('the.key', [ 'args' ], { kwArgs: true }, { ``` ### Options Description -* retain: time in seconds to keep the message in the server memory. Zero means forever. Default value is false that means message does no retain. -* weak: The key disappears then client disconnects. (private|public) who could see the message, public by default -* when: publish only if the key meets requirements. null means that key should not be exists. -* watch: applicable if `when` option defined. Provide ability to wait the necesssary condition and do action immediately. If several clients waits for that the only one achieves acknowledge message. -* sequence: generate unique key +* retain: boolean, keep in Key Value storage. Default value is false that means message does not retain. +* when: struct, publish only if the key meets requirements. null means that key should not be exists. +* watch: boolean, applicable if `when` option defined. Provide ability to wait the necesssary condition and do action immediately. If several clients waits for that the only one achieves acknowledge message. ### Aggregate Engine for the data streams diff --git a/lib/context.js b/lib/context.js index f2b71c6..f18991a 100644 --- a/lib/context.js +++ b/lib/context.js @@ -7,7 +7,6 @@ class Context { constructor (router, session, sender) { this.router = router - this.id = undefined this.session = session this.sender = sender } @@ -16,14 +15,6 @@ class Context { return this.session } - setId (id) { - this.id = id - } - - getId () { - return this.id - } - emit (event, message, data) { this.router.emit(event, this.session, message, data) } diff --git a/lib/memkv.js b/lib/memkv.js index 75152fe..51e380b 100644 --- a/lib/memkv.js +++ b/lib/memkv.js @@ -22,65 +22,92 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { }) } + isDataFit (when, data) { + if (when === null && data === null) { + return true + } + + if (when !== null && data !== null) { + return true + } + + return false + } + setKeyActor (actor) { - const opt = actor.getOpt() const suri = restoreUri(extract(actor.getUri(), this.getUriPattern())) - const data = actor.getData() - const pub = () => { - if (actor) { - this.pubActor(actor) - } - if (data === null) { - this._keyDb.delete(suri) - } else { - this._keyDb.set(suri, [actor.getSid(), data]) + let oldSid = null + let oldData = null + let oldWill = null + let resWhen = [] + + const findWhenActor = (curActor) => { + for (let i = 0; i < resWhen.length; i++) { + const whenActor = resWhen[i] + if (this.isDataFit(whenActor.getOpt().when, curActor.getData())) { + resWhen.splice(i, 1) + return whenActor + } } + return false } - const row = this._keyDb.get(suri) - if ('when' in opt) { - if (opt.when === null) { - - if (undefined === row) { - pub() - return + const pub = (newActor) => { + do { + const newOpt = newActor.getOpt() + const newData = newActor.getData() + const willSid = 'will' in newOpt ? newActor.getSid() : null + this.pubActor(newActor) + if (newData === null) { + this._keyDb.delete(suri) + } else { + this._keyDb.set(suri, [willSid, newData, newOpt.will, resWhen]) } - const [sid, val, when] = row + newActor = findWhenActor(newActor) + } while (newActor) + } - if (val === null) { - pub() - return - } + const opt = actor.getOpt() + const oldRow = this._keyDb.get(suri) + if (oldRow) { + [oldSid, oldData, oldWill, resWhen] = oldRow + } - actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Found value is not empty') + if ('when' in opt) { + if (this.isDataFit(opt.when, oldData)) { + pub(actor) + return + } else if (opt.watch) { + resWhen.push(actor) + return + } else { + actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'not accepted') return } - - if (undefined !== row) { - const [sid, val, when] = row - if (val !== null) { - pub() - return - } - } - actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Value is empty') - return } - - pub() + // no when publish + pub(actor) } removeSession (sessionId) { let toRemove = [] - for (let key in this._keyDb) { - const keySessionId = this._keyDb.get(key)[0] + for (const [key, value] of this._keyDb) { + const keySessionId = value[0] if (keySessionId === sessionId) { toRemove.push(key) } } for (let i = 0; i < toRemove.length; i++) { - this.setKeyData(toRemove[i], null) + const key = toRemove[i] + const row = this._keyDb.get(key) + const will = row[2] + + if (row[2]) { + this.setKeyData(key, will) + } else { + this.setKeyData(key, null) + } } } } diff --git a/lib/mqtt/gate.js b/lib/mqtt/gate.js index a767e55..1e0ce1d 100644 --- a/lib/mqtt/gate.js +++ b/lib/mqtt/gate.js @@ -71,6 +71,14 @@ let handlers = {} let cmdAck = {} class MqttContext extends Context { + setMqttPkg (mqttPkg) { + this.mqttPkg = mqttPkg + } + + getMqttPkg () { + return this.mqttPkg + } + sendEvent (cmd) { const session = this.getSession() if (session.getLastPublishedId() == cmd.qid) { @@ -118,11 +126,11 @@ class MqttContext extends Context { sendError (cmd, code, text) { if (undefined === cmd.id) { - throw new RealmError(this.getId(), code, text) + throw new RealmError(undefined, code, text) } else { // subscribe error mode - const pkg = this.getId() + const pkg = this.getMqttPkg() pkg.granted[cmd.id] = 0x80 this.mqttSubscribeDone() } @@ -130,7 +138,7 @@ class MqttContext extends Context { mqttSubscribeDone () { // granted is accepted QoS for the subscription, possible [0,1,2, + 128 or 0x80 for failure] - const pkg = this.getId() + const pkg = this.getMqttPkg() pkg.count-- if (pkg.count === 0) { this.mqttSend({ cmd: 'suback', messageId: pkg.id, granted: pkg.granted }) @@ -300,7 +308,7 @@ cmdAck.publish = function (cmd) { handlers.puback = function (ctx, session, message) { this.checkRealm(session) - let qid = session.fetchWaitId(message.messageId) + const qid = session.fetchWaitId(message.messageId) if (qid) { session.realm.doConfirm(ctx, { id: qid @@ -325,7 +333,7 @@ handlers.subscribe = function (ctx, session, message) { granted: [], count: message.subscriptions.length } - ctx.setId(pkg) + ctx.setMqttPkg(pkg) const afterId = session.getLastPublishedId() for (let index=0; index < message.subscriptions.length; index++) { const qos = Math.min(message.subscriptions[index].qos, 1) diff --git a/lib/realm.js b/lib/realm.js index 85c6b59..047b8bb 100644 --- a/lib/realm.js +++ b/lib/realm.js @@ -34,7 +34,7 @@ class Actor { getOpt () { if (this.msg.opt !== undefined) { - return this.msg.opt + return Object.assign({}, this.msg.opt) } else { return {} } @@ -188,6 +188,7 @@ class ActorPush extends Actor { constructor (ctx, msg) { super(ctx, msg) this.clientNotified = false + this.eventId = null } setEventId (eventId) { @@ -203,7 +204,6 @@ class ActorPush extends Actor { clone.qid = this.eventId this.ctx.acknowledged(clone) } - // this.realm.engine.qConf.doneDefer(this.sid, this.eventId) } } @@ -277,7 +277,6 @@ class BaseEngine { this.qYield = new DeferMap() this.wTrace = new Qlobber() // [uri][subscription] - this.qConf = new DeferMap() } getSubStack (uri) { @@ -417,30 +416,23 @@ class BaseEngine { // By default, a Publisher of an event will not itself receive an event published, // even when subscribed to the topic the Publisher is publishing to. // If supported by the Broker, this behavior can be overridden via the option exclude_me set to false. - dispatch (actor) { - actor.destSID = {} - - let found = this.matchTrace(actor.getUri()) + dispatch (event, excludeSid) { + let destSID = {} + const found = this.matchTrace(event.uri) for (let i = 0; i < found.length; i++) { - let subD = found[i] - actor.destSID[subD.getSid()] = true - - if (actor.getSid() !== subD.getSid() || - (!actor.getOpt().exclude_me) - ) { - this.actorPush(subD, actor.getEvent()) + const subD = found[i] + if (excludeSid !== subD.getSid()) { + destSID[subD.getSid()] = true + this.actorPush(subD, event) } } - this.actorConfirm(actor, actor.msg) + return destSID } doPush (actor) { - this.qConf.addDefer(actor, actor.eventId) - this.dispatch(actor) - } - - doConfirm (actor, cmd) { - this.actorConfirm(actor, cmd) + const excludeSid = actor.getOpt().exclude_me ? actor.getSid() : 0 + actor.destSID = this.dispatch(actor.getEvent(), excludeSid) + this.actorConfirm(actor, actor.msg) } getHistoryAfter (after, uri, cbRow) { return new Promise(() => {}) } @@ -536,13 +528,7 @@ class BaseRealm extends EventEmitter { } } - doConfirm (ctx, cmd) { - const session = ctx.getSession() - const defer = this.engine.qConf.getDefer(session.sessionId, cmd.qid) - if (defer) { - this.engine.doConfirm(defer, cmd) - } - } + doConfirm (ctx, cmd) {} // declare wamp.topic.history.after (uri, arg) // last limit|integer @@ -726,7 +712,7 @@ class ActorPushKv { } getOpt () { - return this.opt + return Object.assign({}, this.opt) } getUri () { @@ -792,9 +778,9 @@ class MemEngine extends BaseEngine { this._messages = [] } - dispatch (actor) { + dispatch (event, excludeSid) { // event.qid = ++this._messageGen - return super.dispatch(actor) + return super.dispatch(event, excludeSid) } doPush (actor) { diff --git a/lib/wamp/api.js b/lib/wamp/api.js index 04b35d6..4b9c5f0 100644 --- a/lib/wamp/api.js +++ b/lib/wamp/api.js @@ -29,7 +29,6 @@ class WampApiContext extends Context { } acknowledged (cmd) { - // console.log('ACK message not handled', cmd) if (cmd.id && cmd.id.resolve) { cmd.id.resolve(cmd.qid) } @@ -109,7 +108,26 @@ function WampApi (realm) { if (opt.exclude_me !== false) { opt.exclude_me = true } - return realm.doPush(ctx, { uri: wampParse(uri), opt, data: { args, kwargs } }) + let result + let subContainer + let ack + if (opt.acknowledge) { + ack = true + delete opt.acknowledge + subContainer = {} + result = new Promise((resolve, reject) => { + subContainer.resolve = resolve + subContainer.reject = reject + }) + } + realm.doPush(ctx, { + id: subContainer, + uri: wampParse(uri), + opt, + data: { args, kwargs }, + ack + }) + return result } // gate override/internal part diff --git a/lib/wamp/gate.js b/lib/wamp/gate.js index 6ce8a4d..df5b6ab 100644 --- a/lib/wamp/gate.js +++ b/lib/wamp/gate.js @@ -283,7 +283,6 @@ handlers[WAMP.REGISTER] = function (ctx, session, message) { let inopt = message.shift() let uri = wampParse(message.shift()) - ctx.setId(id) this.checkRealm(session, id) // this.checkAuthorize(ctx, cmd, 'register') @@ -315,7 +314,6 @@ handlers[WAMP.CALL] = function (ctx, session, message) { var args = message.shift() || [] var kwargs = message.shift() || null - ctx.setId(id) this.checkRealm(session, id) // this.checkAuthorize(ctx, cmd, 'call') let cmd = { @@ -334,6 +332,12 @@ handlers[WAMP.CALL] = function (ctx, session, message) { session.realm.doCallRpc(ctx, cmd) } +handlers[WAMP.CANCEL] = function (ctx, session, message) { + var id = message.shift() + var opt = message.shift() || {} + session.realm.doCancel(ctx, { wtype: WAMP.CANCEL, id, opt }) +} + handlers[WAMP.UNREGISTER] = function (ctx, session, message) { var id = message.shift() var unr = message.shift() @@ -371,7 +375,6 @@ handlers[WAMP.SUBSCRIBE] = function (ctx, session, message) { const opt = message.shift() const uri = wampParse(message.shift()) - ctx.setId(id) this.checkRealm(session, id) const cmd = { wtype: WAMP.SUBSCRIBE, @@ -409,8 +412,6 @@ handlers[WAMP.PUBLISH] = function (ctx, session, message) { const args = message.shift() || [] const kwargs = message.shift() || null - ctx.setId(id) - const cmd = { wtype: WAMP.PUBLISH, id, diff --git a/package-lock.json b/package-lock.json index 48b3f47..5c756f5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.2", + "version": "0.7.3", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index 420d3a3..f681989 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.2", + "version": "0.7.3", "description": "Web Application Message Router/Server WAMP/MQTT", "author": { "name": "Anatoly Tsapkov", diff --git a/test/broker.js b/test/broker.js index d6c9a57..f90311e 100644 --- a/test/broker.js +++ b/test/broker.js @@ -3,11 +3,15 @@ const chai = require('chai') const spies = require('chai-spies') const expect = chai.expect +const assert = chai.assert +const promised = require('chai-as-promised') + const { RESULT_OK, RESULT_ACK, RESULT_ERR } = require('../lib/messages') const errorCodes = require('../lib/realm_error').errorCodes const FoxGate = require('../lib/hyper/gate') const Router = require('../lib/router') +chai.use(promised) chai.use(spies) describe('hyper-broker', function () { @@ -30,8 +34,10 @@ describe('hyper-broker', function () { }) afterEach(function () { - session.cleanup() - session = null + if (session) { + session.cleanup() + session = null + } }) it('echo should return OK with sent data', function () { @@ -159,86 +165,174 @@ describe('hyper-broker', function () { expect(sender.send).to.have.been.called.twice() }) - it('published-confirm', function () { - const idTrace = 20 - const idUnTrace = 21 - const idPush = 22 - let regTrace - let regPush - - // make realm replicable - realm.engine.actorConfirm = (actor, cmd) => {}; - - realm.engine.doConfirm = (actor, cmd) => { - actor.confirm(cmd) - } + it('push-will', function () { + const api = realm.foxApi() - sender.send = chai.spy((msg) => { - regTrace = msg.data - expect(msg).to.deep.equal({ - id: idTrace, - rsp: RESULT_ACK, - data: regTrace - }) + let n = 0 + const event = chai.spy((id, event) => { + n++ + if (n === 1) { + expect(event).to.deep.equal({ kv: { event: 'value' } }) + } + if (n === 2) { + expect(event).to.deep.equal({ kv: { will: 'value' } }) + } }) + api.subscribe(['will', 'test'], event) session.handle(ctx, { - ft: 'TRACE', - uri: ['testQ'], - id: idTrace, - opt: {} + ft: 'PUSH', + data: { kv: { event: 'value' } }, + uri: ['will', 'test'], + opt: { retain: true, will: { kv: { will: 'value' } } } }) - expect(sender.send).to.have.been.called.once() + expect(event).to.have.been.called.once() + + session.cleanup() + expect(event).to.have.been.called.twice() + }) + it('push-watch-for-push', function () { + let n = 0 sender.send = chai.spy((msg) => { - regPush = msg.qid - expect(msg).to.deep.equal({ - id: idTrace, - uri: ['testQ'], - qid: regPush, - opt: {}, - rsp: 'EVENT', - data: 'published-data' - }) + n++ + if (n === 1) { + expect(msg).to.deep.equal({ id: 'init-kv', rsp: 'OK', data: undefined }) + } else if (n === 2) { + expect(msg).to.deep.equal({ id: 'watch-for-value', rsp: 'OK', data: undefined }) + } }) + const api = realm.foxApi() + + let m = 0 + const event = chai.spy((id, event) => { + m++ + if (m === 1) { + expect(event).to.deep.equal({ kv: { event: 'value' } }) + } else if (m === 2) { + expect(event).to.deep.equal({ kv: { event: 'watch-for-empty' } }) + } + }) + api.subscribe(['watch', 'test'], event) + session.handle(ctx, { ft: 'PUSH', - uri: ['testQ'], + data: { kv: { event: 'value' } }, + uri: ['watch', 'test'], + opt: { + retain: true, + trace: true + }, ack: true, - data: 'published-data', - id: idPush + id: 'init-kv' }) + expect(event).to.have.been.called.once() expect(sender.send).to.have.been.called.once() - - sender.send = chai.spy((msg) => { - expect(msg).to.deep.equal({ - id: idPush, - qid: regPush, - rsp: RESULT_OK, - data: 'confirm-data' - }) - }) + expect(realm.engine._messages.length).to.equal(1) session.handle(ctx, { - ft: 'CONFIRM', - qid: regPush, - data: 'confirm-data' + ft: 'PUSH', + data: { kv: { event: 'watch-for-empty' } }, + uri: ['watch', 'test'], + opt: { + trace: true, + retain: true, + when: null, + watch: true + }, + ack: true, + id: 'watch-for-value' }) + expect(event).to.have.been.called.once() expect(sender.send).to.have.been.called.once() + expect(realm.engine._messages.length).to.equal(1) - sender.send = chai.spy((msg) => { - expect(msg).to.deep.equal({ - id: idUnTrace, - rsp: RESULT_OK - }) + api.publish(['watch', 'test'], null, { retain: true }) + expect(event).to.have.been.called.twice() + expect(sender.send).to.have.been.called.twice() + expect(realm.engine._messages.length).to.equal(2) + }) + + it('push-watch-for-will', function () { + let defer = [] + sender.send = chai.spy((msg) => {}) + + const api = realm.wampApi() + + let m = 0 + const event = chai.spy((id, args, kwargs) => { + m++ + if (m === 1) { + expect(args).to.deep.equal([]) + expect(kwargs).to.deep.equal({ event: 'value-1' }) + } else if (m === 2) { + expect(args).to.deep.equal([]) + expect(kwargs).to.deep.equal(undefined) + } else if (m === 3) { + expect(args).to.deep.equal([]) + expect(kwargs).to.deep.equal({ event: 'value-2' }) + } else { + expect(true).to.equal('no more events') + } }) + api.subscribe('watch.test', event) session.handle(ctx, { - ft: 'UNTRACE', - unr: regTrace, - id: idUnTrace + ft: 'PUSH', + data: { kv: { event: 'value-1' } }, + uri: ['watch', 'test'], + opt: { + retain: true, + trace: true, + when: null, + will: null + }, + ack: true, + id: 'init-kv' + }) + + defer.push(assert.isRejected( + api.publish( + 'watch.test', + [], + { event: 'value-no-watch' }, + { + retain: true, + trace: true, + when: null, + will: null, + acknowledge: true + } + ) + )) + + defer.push(assert.becomes( + api.publish( + 'watch.test', + [], + { event: 'value-2' }, + { + retain: true, + trace: true, + when: null, + will: null, + watch: true, + acknowledge: true, + exclude_me: false + } + ).then(() => { + expect(session, 'after disconnect').to.equal(null) + return 'LOCK-DONE' + }), + 'LOCK-DONE' + )) + + session.cleanup() + session = null + + return Promise.all(defer).then(() => { + expect(event).to.have.been.called.exactly(3) }) - expect(sender.send).to.have.been.called.once() }) }) diff --git a/test/pub_worker.js b/test/pub_worker.js index e3d29c9..43714d8 100644 --- a/test/pub_worker.js +++ b/test/pub_worker.js @@ -36,20 +36,20 @@ describe('pub-worker', function () { worker = null }) - it('echo should return OK with sent data', function (done) { - assert.becomes( + it('echo should return OK with sent data', function () { + return assert.becomes( client.echo('test'), 'test', 'echo done' - ).notify(done) + ) }) - it('call to not existed function has to be failed', function (done) { - assert.isRejected( - client.call('test.func', { attr1: 1, attr2: 2 }), + it('call to not existed function has to be failed', function () { + return assert.isRejected( + client.call('test.func', { attr1: 1 }), /no callee registered for procedure/, 'call rejected' - ).notify(done) + ) }); it('remote-procedure-call', function (done) { @@ -73,8 +73,8 @@ describe('pub-worker', function () { ) }) - it('call-progress', function (done) { - worker.register( + it('call-progress', function () { + return worker.register( 'test.func', function (args, task) { expect(task.getUri()).to.deep.equal([ 'test', 'func' ]) expect(args).to.deep.equal({ attr1: 1, attr2: 2 }) @@ -88,7 +88,7 @@ describe('pub-worker', function () { client.call('test.func', { attr1: 1, attr2: 2 }), { result: 'done' }, 'call should be processed' - ).notify(done) + ) }, function (reason) { assert(false, 'unable to register')