From f06c0b54e1fbb2a3d3783e8093662d2c8ca8c682 Mon Sep 17 00:00:00 2001 From: Anatoly Tsapkov Date: Sat, 20 Mar 2021 01:15:28 -0400 Subject: [PATCH] refactor kv storage --- lib/binder.js | 6 +- lib/mono/membinder.js | 75 ++------------------- lib/mono/memkv.js | 90 +++---------------------- lib/mqtt/gate.js | 30 ++++----- lib/realm.js | 148 ++++++++++++++++++++++++++++++++++++++++-- lib/wamp/gate.js | 25 ++++--- test/05.realm.js | 37 +++++++---- test/07.mqtt.js | 12 ++-- test/09.kv.js | 103 +++++++++++++++-------------- 9 files changed, 272 insertions(+), 254 deletions(-) diff --git a/lib/binder.js b/lib/binder.js index 0f340c1..6cf12e8 100644 --- a/lib/binder.js +++ b/lib/binder.js @@ -8,15 +8,15 @@ class ReactEngine extends BaseEngine { this.binder = binder } - doPush (actor) { - return this.binder.doPush(this, actor) + keepHistory (actor) { + return this.binder.keepHistory(this, actor) } getHistoryAfter (after, uri, cbRow) { return this.binder.getHistoryAfter(this, after, uri, cbRow) } - cleanupSession(sessionId) { // override + cleanupSession(sessionId) { return this.binder.cleanupSession(this, sessionId) } } diff --git a/lib/mono/membinder.js b/lib/mono/membinder.js index 679dfb8..2c6512e 100644 --- a/lib/mono/membinder.js +++ b/lib/mono/membinder.js @@ -1,6 +1,6 @@ 'use strict' -const { match, intersect, merge, extract } = require('../topic_pattern') +const { match } = require('../topic_pattern') const { AbstractBinder, BaseEngine, BaseRealm } = require('../realm') const { MemKeyValueStorage } = require('./memkv') @@ -9,40 +9,20 @@ class MemEngine extends BaseEngine { super() this._messageGen = 0 this._messages = [] - this._kvo = [] // key value order } - setKeyActor (actor) { - const uri = actor.getUri() - for (let i = this._kvo.length - 1; i >= 0; i--) { - const kvr = this._kvo[i] - if (match(uri, kvr.uri)) { - kvr.kv.setKeyActor(actor) - break - } - } - } - - memPush (actor) { + keepHistory (actor) { actor.setEventId(++this._messageGen) - actor.destSID = this.dispatch(actor.getEvent()) - actor.confirm(actor.msg) - if (actor.getOpt().trace) { this._messages.push(actor.getEvent()) if (this._messages.length > 10100) { this._messages = this._messages.splice(100) } } - } - doPush (actor) { - if (actor.getOpt().retain) { - this.setKeyActor(actor) - } else { - this.memPush(actor) - } + actor.destSID = this.dispatch(actor.getEvent()) + actor.confirm(actor.msg) } getHistoryAfter (after, uri, cbRow) { @@ -56,51 +36,6 @@ class MemEngine extends BaseEngine { resolve() }) } - - getKey (uri, cbRow) { - const done = [] - for (let i = this._kvo.length - 1; i >= 0; i--) { - const kvr = this._kvo[i] - // console.log('MATCH++', uri, kvr.uri, extract(uri, kvr.uri)) - if (intersect(uri, kvr.uri)) { - done.push(kvr.kv.getKey( - extract(uri, kvr.uri), - (aKey, data) => { - cbRow(merge(aKey, kvr.uri), data) - } - )) - } - } - return Promise.all(done) - } - - setKeyData (uri, data) { - for (let i = this._kvo.length - 1; i >= 0; i--) { - const kvr = this._kvo[i] - if (match(uri, kvr.uri)) { - kvr.kv.setKeyData(extract(uri, kvr.uri), data) - break - } - } - } - - registerKeyValueEngine (uri, kv) { - kv.setUriPattern(uri) - kv.pubActor = this.memPush.bind(this) - this._kvo.push({ uri, kv }) - } - - cleanupSession(sessionId) { // override - for (let i = this._kvo.length - 1; i >= 0; i--) { - this._kvo[i].kv.removeSession(sessionId) - } - } -} - -class MemRealm extends BaseRealm { - registerKeyValueEngine (uri, kv) { - return this.engine.registerKeyValueEngine(uri, kv) - } } class MemBinder extends AbstractBinder { @@ -108,7 +43,7 @@ class MemBinder extends AbstractBinder { const engine = new MemEngine() engine.registerKeyValueEngine(['#'], new MemKeyValueStorage()) - return new MemRealm( + return new BaseRealm( router, engine ) diff --git a/lib/mono/memkv.js b/lib/mono/memkv.js index 9974006..4bf5dd0 100644 --- a/lib/mono/memkv.js +++ b/lib/mono/memkv.js @@ -1,76 +1,8 @@ 'use strict' -const { merge, match, extract, defaultParse, restoreUri } = require('../topic_pattern') +const { match, extract, defaultParse, restoreUri } = require('../topic_pattern') const errorCodes = require('../realm_error').errorCodes -const { isDataFit } = require('../realm') - -class ActorPushKv { - constructor (uri, data, opt) { - this.uri = uri - this.data = data - this.opt = opt - this.eventId = null - } - - getOpt () { - return Object.assign({}, this.opt) - } - - getUri () { - return this.uri - } - - getSid() { - return this.opt.sid - } - - getData () { - return this.data - } - - setEventId (id) { - this.eventId = id - } - - getEvent () { - return { - qid: this.eventId, - uri: this.getUri(), - data: this.getData(), - opt: this.getOpt() - } - } - - confirm () {} -} - -class KeyValueStorageAbstract { - constructor () { - this.uriPattern = '#' - } - - setUriPattern (uriPattern) { - this.uriPattern = uriPattern - } - - getUriPattern () { - return this.uriPattern - } - - // pubActor (actor) virtual abstract - - setKeyData (key, data) { - this.setKeyActor( - new ActorPushKv( - merge(key, this.uriPattern), - data, - {}) - ) - } - - // Promise:getKey (uri, cbRow) ::cbRow:: aKey, data - // removeSession (sessionId) -} +const { KeyValueStorageAbstract, isDataFit } = require('../realm') class MemKeyValueStorage extends KeyValueStorageAbstract { constructor () { @@ -98,7 +30,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { let oldData = null let resWhen = [] - const findWhenActor = (curActor) => { + const findNextWhenActor = (curData) => { for (let i = 0; i < resWhen.length; i++) { const whenActor = resWhen[i] if (!whenActor.isActive()) { @@ -106,7 +38,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { i-- continue } - if (isDataFit(whenActor.getOpt().when, curActor.getData())) { + if (isDataFit(whenActor.getOpt().when, curData)) { resWhen.splice(i, 1) return whenActor } @@ -114,18 +46,18 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { return false } - const pub = (newActor) => { + const pubWhile = (newActor) => { do { const newOpt = newActor.getOpt() const newData = newActor.getData() - const willSid = 'will' in newOpt ? newActor.getSid() : null - this.pubActor(newActor) + const willSid = ('will' in newOpt) ? newActor.getSid() : null + this.keepHistory(newActor) if (newData === null) { this._keyDb.delete(suri) } else { this._keyDb.set(suri, [willSid, newData, newOpt.will, resWhen]) } - newActor = findWhenActor(newActor) + newActor = findNextWhenActor(newActor.getData()) } while (newActor) } @@ -137,7 +69,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { if ('when' in opt) { if (isDataFit(opt.when, oldData)) { - pub(actor) + pubWhile(actor) return } else if (opt.watch) { resWhen.push(actor) @@ -148,7 +80,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { } } // no when publish - pub(actor) + pubWhile(actor) } removeSession (sessionId) { @@ -180,6 +112,4 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { } } -exports.KeyValueStorageAbstract = KeyValueStorageAbstract -exports.ActorPushKv = ActorPushKv exports.MemKeyValueStorage = MemKeyValueStorage diff --git a/lib/mqtt/gate.js b/lib/mqtt/gate.js index 0c52777..3a7e4f0 100644 --- a/lib/mqtt/gate.js +++ b/lib/mqtt/gate.js @@ -71,14 +71,18 @@ let handlers = {} let cmdAck = {} class MqttContext extends Context { - setMqttPkg (mqttPkg) { + setSubscribePkg (mqttPkg) { this.mqttPkg = mqttPkg } - getMqttPkg () { + getSubscribePkg () { return this.mqttPkg } + setMqttType (mqttType) { + this.mqttType = mqttType + } + sendEvent (cmd) { const session = this.getSession() if (session.getLastPublishedId() == cmd.qid) { @@ -113,7 +117,7 @@ class MqttContext extends Context { } acknowledged (cmd) { - cmdAck[cmd.mtype].call(this, cmd) + cmdAck[this.mqttType].call(this, cmd) } mqttSend (msg, callback) { @@ -130,7 +134,7 @@ class MqttContext extends Context { } else { // subscribe error mode - const pkg = this.getMqttPkg() + const pkg = this.getSubscribePkg() pkg.granted[cmd.id] = 0x80 this.mqttSubscribeDone() } @@ -138,7 +142,7 @@ class MqttContext extends Context { mqttSubscribeDone () { // granted is accepted QoS for the subscription, possible [0,1,2, + 128 or 0x80 for failure] - const pkg = this.getMqttPkg() + const pkg = this.getSubscribePkg() pkg.count-- if (pkg.count === 0) { this.mqttSend({ cmd: 'suback', messageId: pkg.id, granted: pkg.granted }) @@ -232,7 +236,6 @@ class MqttGate extends BaseGate { } const cmd = { - mtype: 'publish', uri: mqttParse(message.topic), data: (message.payload.length === 0 ? null @@ -253,14 +256,14 @@ class MqttGate extends BaseGate { ctx.mqttClose(1003, 'protocol violation') return } - let mtype = msg.cmd - if (!handlers[mtype]) { + if (!handlers[msg.cmd]) { this._router.emit(SESSION_WARNING, session, 'command not found', msg) ctx.mqttClose(1003, 'protocol violation') return } + ctx.setMqttType(msg.cmd) try { - handlers[mtype].call(this, ctx, session, msg) + handlers[msg.cmd].call(this, ctx, session, msg) } catch (err) { if (err instanceof RealmError) { console.log(err) @@ -333,7 +336,7 @@ handlers.subscribe = function (ctx, session, message) { granted: [], count: message.subscriptions.length } - ctx.setMqttPkg(pkg) + ctx.setSubscribePkg(pkg) const afterId = session.getLastPublishedId() for (let index=0; index < message.subscriptions.length; index++) { const qos = Math.min(message.subscriptions[index].qos, 1) @@ -349,12 +352,7 @@ handlers.subscribe = function (ctx, session, message) { if (message.retain) { opt.retained = true } - const cmd = { - mtype: 'subscribe', - id: index, - uri, - opt - } + const cmd = { id: index, uri, opt } if (this.checkAuthorize(ctx, cmd, 'subscribe')) { session.realm.doTrace(ctx, cmd) } diff --git a/lib/realm.js b/lib/realm.js index 29105d5..426f5c0 100644 --- a/lib/realm.js +++ b/lib/realm.js @@ -6,7 +6,7 @@ const EventEmitter = require('events').EventEmitter const { SESSION_JOIN, SESSION_LEAVE, RESULT_EMIT, ON_SUBSCRIBED, ON_UNSUBSCRIBED, ON_REGISTERED, ON_UNREGISTERED } = require('./messages') -const { restoreUri } = require('./topic_pattern') +const { match, intersect, merge, extract, restoreUri } = require('./topic_pattern') const errorCodes = require('./realm_error').errorCodes const RealmError = require('./realm_error').RealmError const WampApi = require('./wamp/api') @@ -336,6 +336,7 @@ class BaseEngine { this.qYield = new DeferMap() this.wTrace = new Qlobber() // [uri][subscription] + this._kvo = [] // key value order } getRealmName() { @@ -506,7 +507,15 @@ class BaseEngine { // By default, a Publisher of an event will not itself receive an event published, // even when subscribed to the topic the Publisher is publishing to. // If supported by the Broker, this behavior can be overridden via the option exclude_me set to false. - dispatch (event, eventOpt) { + + // event + // qid: this.eventId, + // sid: + // uri: + // data: + // opt: {exclude_me} + + dispatch (event) { let destSID = {} const found = this.matchTrace(event.uri) for (let i = 0; i < found.length; i++) { @@ -519,15 +528,70 @@ class BaseEngine { return destSID } - doPush (actor) { + keepHistory (actor) { actor.destSID = this.dispatch(actor.getEvent()) actor.confirm(actor.msg) } + doPush (actor) { + if (actor.getOpt().retain) { + return this.setValueFromActor(actor) + } else { + return this.keepHistory(actor) + } + } + + setValueFromActor (actor) { + const uri = actor.getUri() + for (let i = this._kvo.length - 1; i >= 0; i--) { + const kvr = this._kvo[i] + if (match(uri, kvr.uri)) { + kvr.kv.setKeyActor(actor) + break + } + } + } + + getKey (uri, cbRow) { + const done = [] + for (let i = this._kvo.length - 1; i >= 0; i--) { + const kvr = this._kvo[i] + // console.log('MATCH++', uri, kvr.uri, extract(uri, kvr.uri)) + if (intersect(uri, kvr.uri)) { + done.push(kvr.kv.getKey( + extract(uri, kvr.uri), + (aKey, data) => { + cbRow(merge(aKey, kvr.uri), data) + } + )) + } + } + return Promise.all(done) + } + + setKeyData (uri, data) { + for (let i = this._kvo.length - 1; i >= 0; i--) { + const kvr = this._kvo[i] + if (match(uri, kvr.uri)) { + kvr.kv.setKeyData(extract(uri, kvr.uri), data) + break + } + } + } + + registerKeyValueEngine (uri, kv) { + kv.setUriPattern(uri) + kv.keepHistory = this.keepHistory.bind(this) + this._kvo.push({ uri, kv }) + } + + cleanupSession(sessionId) { + for (let i = this._kvo.length - 1; i >= 0; i--) { + this._kvo[i].kv.removeSession(sessionId) + } + } + getHistoryAfter (after, uri, cbRow) { return new Promise(() => {}) } - getKey (uri, cbRow) {} - setKeyData (uri, data) {} - cleanupSession(sessionId) {} } class BaseRealm extends EventEmitter { @@ -718,6 +782,76 @@ class BaseRealm extends EventEmitter { setKeyData (uri, data) { return this.engine.setKeyData(uri, data) } + + registerKeyValueEngine (uri, kv) { + return this.engine.registerKeyValueEngine(uri, kv) + } +} + +class ActorPushKv { + constructor (uri, data, opt) { + this.uri = uri + this.data = data + this.opt = opt + this.eventId = null + } + + getOpt () { + return Object.assign({}, this.opt) + } + + getUri () { + return this.uri + } + + getSid() { + return this.opt.sid + } + + getData () { + return this.data + } + + setEventId (id) { + this.eventId = id + } + + getEvent () { + return { + qid: this.eventId, + uri: this.getUri(), + data: this.getData(), + opt: this.getOpt() + } + } + + confirm () {} +} + +class KeyValueStorageAbstract { + constructor () { + this.uriPattern = '#' + } + + setUriPattern (uriPattern) { + this.uriPattern = uriPattern + } + + getUriPattern () { + return this.uriPattern + } + + setKeyData (key, data) { + this.setKeyActor( + new ActorPushKv( + merge(key, this.uriPattern), + data, + {}) + ) + } + + // Promise:getKey (uri, cbRow) ::cbRow:: aKey, data + // removeSession (sessionId) } class AbstractBinder extends EventEmitter { @@ -746,5 +880,7 @@ exports.unSerializeData = unSerializeData exports.DeferMap = DeferMap exports.BaseEngine = BaseEngine exports.BaseRealm = BaseRealm +exports.ActorPushKv = ActorPushKv +exports.KeyValueStorageAbstract = KeyValueStorageAbstract exports.AbstractBinder = AbstractBinder exports.ZeroBinder = ZeroBinder diff --git a/lib/wamp/gate.js b/lib/wamp/gate.js index b8cae39..18f265f 100644 --- a/lib/wamp/gate.js +++ b/lib/wamp/gate.js @@ -13,6 +13,10 @@ let handlers = {} let cmdAck = {} class WampContext extends Context { + setWampType (msgType) { + this.msgType = msgType + } + sendInvoke (cmd) { let invOpts = {} if (cmd.opt.receive_progress) { @@ -70,11 +74,11 @@ class WampContext extends Context { } acknowledged (cmd) { - cmdAck[cmd.wtype].call(this, cmd) + cmdAck[this.msgType].call(this, cmd) } sendError (cmd, errorCode, text) { - return this.wampSendError(cmd.wtype, cmd.id, errorCode, text) + return this.wampSendError(this.msgType, cmd.id, errorCode, text) } wampSendError (mtype, requestId, errorCode, text) { @@ -240,6 +244,7 @@ class WampGate extends BaseGate { ctx.wampClose(1003, 'protocol violation') return } + ctx.setWampType(mtype) try { handlers[mtype].call(this, ctx, session, msg) } catch (err) { @@ -303,7 +308,7 @@ handlers[WAMP.REGISTER] = function (ctx, session, message) { // https://autobahn.readthedocs.io/en/latest/reference/autobahn.wamp.html#autobahn.wamp.types.RegisterOptions // option.concurrency – if used, the number of times a particular endpoint may be called concurrently - session.realm.doRegRpc(ctx, { wtype: WAMP.REGISTER, id, uri, opt }) + session.realm.doRegRpc(ctx, { id, uri, opt }) } cmdAck[WAMP.REGISTER] = function (cmd) { @@ -338,7 +343,7 @@ handlers[WAMP.CALL] = function (ctx, session, message) { handlers[WAMP.CANCEL] = function (ctx, session, message) { var id = message.shift() var opt = message.shift() || {} - session.realm.doCancel(ctx, { wtype: WAMP.CANCEL, id, opt }) + session.realm.doCancel(ctx, { id, opt }) } handlers[WAMP.UNREGISTER] = function (ctx, session, message) { @@ -346,7 +351,7 @@ handlers[WAMP.UNREGISTER] = function (ctx, session, message) { var unr = message.shift() this.checkRealm(session, id) - session.realm.doUnRegRpc(ctx, { wtype: WAMP.UNREGISTER, id, unr }) + session.realm.doUnRegRpc(ctx, { id, unr }) } cmdAck[WAMP.UNREGISTER] = function (cmd) { @@ -379,12 +384,7 @@ handlers[WAMP.SUBSCRIBE] = function (ctx, session, message) { const uri = wampParse(message.shift()) this.checkRealm(session, id) - const cmd = { - wtype: WAMP.SUBSCRIBE, - id, - uri, - opt - } + const cmd = { id, uri, opt } if (this.checkAuthorize(ctx, cmd, 'subscribe')) { session.realm.doTrace(ctx, cmd) } @@ -399,7 +399,7 @@ handlers[WAMP.UNSUBSCRIBE] = function (ctx, session, message) { const unr = message.shift() this.checkRealm(session, id) - session.realm.doUnTrace(ctx, { wtype: WAMP.UNSUBSCRIBE, id, unr }) + session.realm.doUnTrace(ctx, { id, unr }) } cmdAck[WAMP.UNSUBSCRIBE] = function (cmd) { @@ -416,7 +416,6 @@ handlers[WAMP.PUBLISH] = function (ctx, session, message) { const kwargs = message.shift() || null const cmd = { - wtype: WAMP.PUBLISH, id, uri, data: (kwargs === null && args instanceof Array && args.length === 0 diff --git a/test/05.realm.js b/test/05.realm.js index d60aa77..b80af4a 100644 --- a/test/05.realm.js +++ b/test/05.realm.js @@ -373,30 +373,39 @@ describe('05. wamp-realm', function () { expect(sender.send, 'publication received').to.have.been.called.once() }) - it('SUBSCRIBE-to-remote', function () { - var subSpy = chai.spy( + it('SUBSCRIBE-to-remote-wamp', async function () { + let waitForEvent + let subSpy = chai.spy( function (publicationId, args, kwargs) { expect(args).to.deep.equal(['arg.1', 'arg.2']) expect(kwargs).to.deep.equal({ foo: 'bar' }) + waitForEvent() + waitForEvent = undefined } ) - api.subscribe('topic1', subSpy).then((subId) => { - sender.send = chai.spy( - function (msg, callback) { - expect(msg[0]).to.equal(WAMP.PUBLISHED) - expect(msg[1]).to.equal(2345) - } - ) + sender.send = chai.spy((msg, callback) => { + expect(msg[0]).to.equal(WAMP.PUBLISHED) + expect(msg[1]).to.equal(2345) + }) + + let subId = await api.subscribe('topic1', subSpy) + + await new Promise((resolve, reject) => { + waitForEvent = resolve cli.handle(ctx, [WAMP.PUBLISH, 1234, {}, 'topic1', ['arg.1', 'arg.2'], { foo: 'bar' }]) - expect(sender.send, 'published').to.not.have.been.called() - cli.handle(ctx, [WAMP.PUBLISH, 2345, { acknowledge: true }, 'topic1', ['arg.1', 'arg.2'], { foo: 'bar' }]) - expect(sender.send, 'published').to.have.been.called.once() + }) + expect(sender.send, 'ack is not requested').to.not.have.been.called() - expect(subSpy, 'publication done').to.have.been.called.twice() - expect(api.unsubscribe(subId)).to.equal('topic1') + await new Promise((resolve, reject) => { + waitForEvent = resolve + cli.handle(ctx, [WAMP.PUBLISH, 2345, { acknowledge: true }, 'topic1', ['arg.1', 'arg.2'], { foo: 'bar' }]) }) + expect(sender.send, 'ack must be received').to.have.been.called.once() + expect(subSpy, 'publication done').to.have.been.called.twice() + expect(api.unsubscribe(subId)).to.equal('topic1') }) + }) describe('STORAGE', function () { diff --git a/test/07.mqtt.js b/test/07.mqtt.js index 7d26b14..3705004 100644 --- a/test/07.mqtt.js +++ b/test/07.mqtt.js @@ -36,10 +36,12 @@ describe('07. mqtt-realm', function () { }) describe('publish', function () { - it('to-qos1', function () { - sender.send = chai.spy( - function (msg, callback) {} - ) + it('to-qos1', (done) => { + sender.send = (msg) => { + expect(msg.messageId).to.equal(9191) + expect(msg.cmd).to.equal('puback') + done() + } cli.handle(ctx, { cmd: 'publish', retain: false, @@ -47,9 +49,9 @@ describe('07. mqtt-realm', function () { dup: false, length: 17, topic: 'topic1', + messageId: 9191, payload: Buffer.from('{"the":"text"}') }) - expect(sender.send, 'publish confirmed').to.have.been.called.once() }) it('SUBSCRIBE-to-remote-mqtt', function () { diff --git a/test/09.kv.js b/test/09.kv.js index 363a894..260ed7f 100644 --- a/test/09.kv.js +++ b/test/09.kv.js @@ -8,6 +8,7 @@ const promised = require('chai-as-promised') const FoxGate = require('../lib/hyper/gate') const Router = require('../lib/router') +const WampApi = require('../lib/wamp/api') const {MemBinder} = require('../lib/mono/membinder') chai.use(promised) @@ -17,19 +18,21 @@ describe('09. hyper-kv', function () { let router, gate, - sender, + sessionSender, realm, ctx, - session + session, + api beforeEach(function () { - sender = {} router = new Router(new MemBinder()) realm = router.createRealm('test-realm') gate = new FoxGate(router) session = gate.createSession() - ctx = gate.createContext(session, sender) + sessionSender = {} + ctx = gate.createContext(session, sessionSender) realm.joinSession(session) + api = realm.wampApi() }) afterEach(function () { @@ -39,48 +42,50 @@ describe('09. hyper-kv', function () { } }) - it('push-will', function () { - const api = realm.foxApi() + it('push-will:'/* + run.it */, async function () { + let expectedData = [ + { event: 'value' }, + { will: 'value' }, + ] - let n = 0 - const event = chai.spy((event, opt) => { - n++ - if (n === 1) { - expect(event).to.deep.equal({ kv: { event: 'value' } }) - } - if (n === 2) { - expect(event).to.deep.equal({ kv: { will: 'value' } }) - } + const event = chai.spy((id, args, kwargs) => { + expect(kwargs).to.deep.equal(expectedData.shift()) }) - api.subscribe(['will', 'test'], event) + await api.subscribe('will.test', event) - session.handle(ctx, { - ft: 'PUSH', - data: { kv: { event: 'value' } }, - uri: ['will', 'test'], - opt: { retain: true, will: { kv: { will: 'value' } } } - }) - expect(event).to.have.been.called.once() + let cli = new WampApi(realm, router.makeSessionId()) + realm.joinSession(cli) - session.cleanup() + await cli.publish( + 'will.test', + [], + { event: 'value' }, + { retain: true, will: { kv: { will: 'value' } } } + ) + + expect(event).to.have.been.called.once() + cli.cleanup() expect(event).to.have.been.called.twice() }) - it('push-watch-for-push', function () { + it('push-watch-for-push', async function () { + let curPromise let n = 0 - sender.send = chai.spy((msg) => { + sessionSender.send = (msg) => { n++ if (n === 1) { expect(msg).to.deep.equal({ id: 'init-kv', rsp: 'OK', data: undefined }) } else if (n === 2) { expect(msg).to.deep.equal({ id: 'watch-for-value', rsp: 'OK', data: undefined }) } - }) + curPromise() + curPromise = undefined + } const api = realm.foxApi() let m = 0 - const event = chai.spy((event, opt) => { + const onEvent = chai.spy((event, opt) => { m++ if (m === 1) { expect(event).to.deep.equal({ kv: { event: 'value' } }) @@ -88,21 +93,23 @@ describe('09. hyper-kv', function () { expect(event).to.deep.equal({ kv: { event: 'watch-for-empty' } }) } }) - api.subscribe(['watch', 'test'], event) - session.handle(ctx, { - ft: 'PUSH', - data: { kv: { event: 'value' } }, - uri: ['watch', 'test'], - opt: { - retain: true, - trace: true - }, - ack: true, - id: 'init-kv' + api.subscribe(['watch', 'test'], onEvent) + await new Promise((resolve) => { + curPromise = resolve + session.handle(ctx, { + ft: 'PUSH', + data: { kv: { event: 'value' } }, + uri: ['watch', 'test'], + opt: { + retain: true, + trace: true + }, + ack: true, + id: 'init-kv' + }) }) - expect(event).to.have.been.called.once() - expect(sender.send).to.have.been.called.once() + expect(onEvent).to.have.been.called.once() expect(realm.engine._messages.length).to.equal(1) session.handle(ctx, { @@ -118,19 +125,21 @@ describe('09. hyper-kv', function () { ack: true, id: 'watch-for-value' }) - expect(event).to.have.been.called.once() - expect(sender.send).to.have.been.called.once() + + expect(onEvent).to.have.been.called.once() expect(realm.engine._messages.length).to.equal(1) - api.publish(['watch', 'test'], null, { retain: true }) - expect(event).to.have.been.called.twice() - expect(sender.send).to.have.been.called.twice() + await new Promise((resolve) => { + curPromise = resolve + api.publish(['watch', 'test'], null, { retain: true }) + }) + expect(onEvent).to.have.been.called.twice() expect(realm.engine._messages.length).to.equal(2) }) it('push-watch-for-will', function () { let defer = [] - sender.send = chai.spy((msg) => {}) + sessionSender.send = chai.spy((msg) => {}) const api = realm.wampApi()