From 934461c17e74b5c1cfc55aa42ab6ac20d6a85837 Mon Sep 17 00:00:00 2001 From: Anatoly Tsapkov Date: Thu, 27 Feb 2020 23:17:40 -0500 Subject: [PATCH] dedicated KV storage --- README.md | 7 +- bin/with_auth.js | 10 +- ext/pubstate.js | 46 +++++++ lib/base_gate.js | 9 +- lib/context.js | 6 - lib/hyper/net_transport.js | 77 +++++++++++ lib/memkv.js | 88 +++++++++++++ lib/mqtt/gate.js | 102 +++++++------- lib/mqtt/transport.js | 14 +- lib/realm.js | 264 +++++++++++++++++++++---------------- lib/realm_error.js | 7 +- lib/router.js | 7 +- lib/tools.js | 29 +++- lib/wamp/api.js | 12 +- lib/wamp/dparse.js | 16 ++- lib/wamp/gate.js | 86 ++++++------ lib/wamp/msg.js | 24 ++++ package-lock.json | 28 +++- package.json | 3 +- test/mqtt.js | 90 ++++++++----- test/realm.js | 30 ++++- 21 files changed, 672 insertions(+), 283 deletions(-) create mode 100644 ext/pubstate.js create mode 100644 lib/hyper/net_transport.js create mode 100644 lib/memkv.js create mode 100644 lib/wamp/msg.js diff --git a/README.md b/README.md index f611871..9224298 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ The Web Application Message Server goal is to provide durable message source for Message router has pluggable interface to the several message protocols. As for now it could interact by * [WAMP V2 Basic Profile](http://wamp-proto.org/) -* [MQTT](http://mqtt.org/) +* [MQTT 3.1](http://mqtt.org/) It means that event could be send through MQTT interface and handled by WAMP client. @@ -90,11 +90,12 @@ has to maintain persistence of keys and provide the value as immediate first message for the subscription. And here what could be implemented ```javascript -publish('the.key', [ 'args' ], { kwArgs: false }, { +publish('the.key', [ 'args' ], { kwArgs: true }, { retain: 100, - weak: 'public', + // weak: 'public', when: { status: 'started' }, watch: false + will: { value: 'to', publish: 'at', session: 'disconnect' } }) ``` diff --git a/bin/with_auth.js b/bin/with_auth.js index 2d3d39e..4fd683c 100644 --- a/bin/with_auth.js +++ b/bin/with_auth.js @@ -20,16 +20,16 @@ const WampAuth = function () { this.ticket_auth = function (realmName, secureDetails, secret, extra, cb) { console.log('TICKET_AUTH:', secureDetails, secret, extra) app.getRealm(realmName, (realm) => { - let api = realm.wampApi() + const api = realm.wampApi() let found = false - api.subscribe('sys.user.info.'+secureDetails.authid, (id, args, kwargs) => { + api.subscribe('sys.user.info.' + secureDetails.authid, (id, args, kwargs) => { if (kwargs.password === secret) { cb(undefined, kwargs) } else { cb(new Error('authentication_failed')) } found = true - }).then((subId) => { + }, { retained: true }).then((subId) => { if (!found) { cb(new Error('authentication_failed')) } @@ -57,8 +57,8 @@ const WampAuth = function () { app.getRealm('realm1', function (realm) { var api = realm.wampApi() // create demo database - api.publish('sys.user.info.joe', [], { role: 'user', password: 'joe-secret' }, { retain:true }) - api.publish('sys.user.info.admin', [], { role: 'admin', password: 'admin-secret' }, { retain:true }) + api.publish('sys.user.info.joe', [], { role: 'user', password: 'joe-secret' }, { retain: true }) + api.publish('sys.user.info.admin', [], { role: 'admin', password: 'admin-secret' }, { retain: true }) }) console.log('Listening port:', program.port) diff --git a/ext/pubstate.js b/ext/pubstate.js new file mode 100644 index 0000000..fe03759 --- /dev/null +++ b/ext/pubstate.js @@ -0,0 +1,46 @@ +'use strict' + +const QueueClient = require('../lib/hyper/queueClient').QueueClient +const NetTransport = require('../lib/hyper/net_transport') + +const conf_realm_host = process.env.FC_REALM_HOST || '127.0.0.1' +const conf_realm_port = process.env.FC_REALM_PORT || 9300 + +let client = new QueueClient() + +let socket = NetTransport.createClientSocket(client) +let maxPrefix = undefined +let maxDate = undefined +let maxSegment = undefined + +async function worker () { + await new Promise((resolve, reject) => { + socket.connect(conf_realm_port, conf_realm_host, function () { + resolve() + }) + }) + + let loginDetails = await client.login({}) + console.log('loginDetails', loginDetails) + + await client.trace('getNewSegment', (data, task) => { + let newPrefix = data.date + data.segment + if (!maxPrefix || maxPrefix < newPrefix) { + maxPrefix = newPrefix + maxDate = data.date + maxSegment = data.segment + } + + let result = { date: maxDate, segment: maxSegment } + console.log('segment', maxPrefix, data, result) + + client.push('takeNewSegment', result) + task.resolve({}) + }) +} + +worker().then(function (value) { + console.log('worker OK:', value) +}, function (err) { + console.error('ERROR:', err, err.stack) +}) diff --git a/lib/base_gate.js b/lib/base_gate.js index b58a97c..58e9335 100644 --- a/lib/base_gate.js +++ b/lib/base_gate.js @@ -1,6 +1,5 @@ 'use strict' -const RealmError = require('./realm_error').RealmError const errorCodes = require('./realm_error').errorCodes const Session = require('./session') @@ -37,9 +36,11 @@ class BaseGate { return (typeof this._authHandler !== 'undefined' && typeof this._authHandler.authorize === 'function') } - checkAuthorize (ctx, funcClass, uri, id) { - if (this.isAuthorizeRequired() && !this._authHandler.authorize(ctx.getSession(), funcClass, uri)) { - ctx.error(id, errorCodes.ERROR_AUTHORIZATION_FAILED) + checkAuthorize (ctx, cmd, funcClass) { + if (this.isAuthorizeRequired() && + !this._authHandler.authorize(ctx.getSession(), funcClass, cmd.uri)) + { + ctx.sendError(cmd, errorCodes.ERROR_AUTHORIZATION_FAILED) return false } return true diff --git a/lib/context.js b/lib/context.js index a672b23..f2b71c6 100644 --- a/lib/context.js +++ b/lib/context.js @@ -4,8 +4,6 @@ // sender.send(msg, callback) // sender.close(code, reason) -const RealmError = require('./realm_error').RealmError - class Context { constructor (router, session, sender) { this.router = router @@ -29,10 +27,6 @@ class Context { emit (event, message, data) { this.router.emit(event, this.session, message, data) } - - error(id, code, msg) { - throw new RealmError(this.getId(), code, msg) - } } module.exports = Context diff --git a/lib/hyper/net_transport.js b/lib/hyper/net_transport.js new file mode 100644 index 0000000..d3f7405 --- /dev/null +++ b/lib/hyper/net_transport.js @@ -0,0 +1,77 @@ +'use strict' + +const net = require('net') +const msgpack = require('msgpack-lite') +const { SESSION_TX, SESSION_RX, SESSION_WARNING } = require('../messages') + +function ServerNetSender (socket, session, router) { + // var encodeStream = msgpack.createEncodeStream() + // encodeStream.pipe(socket) + this.send = function (msg, callback) { + router.emit(SESSION_TX, session, JSON.stringify(msg)) + socket.write(msgpack.encode(msg)) + // encodeStream.write(msg); + // encodeStream.end(); does not sending without end, but disconnections + } + + this.close = function (code, reason) { + socket.end() + } +} + +function NetServer (gate, options) { + let router = gate.getRouter() + let _server = net.Server(function (socket) { + let session = gate.createSession() + let sender = new ServerNetSender(socket, session, router) + let decodeStream = msgpack.createDecodeStream() + + socket.pipe(decodeStream).on('data', function (msg) { + let ctx = gate.createContext(session, sender) + try { + router.emit(SESSION_RX, session, JSON.stringify(msg)) + session.handle(ctx, msg) + } catch (e) { + router.emit(SESSION_WARNING, session, 'invalid message', msg) + session.close(1003, 'protocol violation') + console.log(e) + } + }) + + socket.on('end', function () { + }) + + socket.on('close', function () { + session.cleanup() + }) + }) + _server.listen(options) + + return _server +} + +function ClientNetSender (socket) { + this.send = function (msg, callback) { + socket.write(msgpack.encode(msg)) + } + + this.close = function (code, reason) { + socket.end() + } +} + +function createClientSocket (client) { + var socket = new net.Socket() + var sender = new ClientNetSender(socket) + client.sender = sender + + let decodeStream = msgpack.createDecodeStream() + + socket.pipe(decodeStream).on('data', function (msg) { + client.handle(msg) + }) + return socket +} + +exports.NetServer = NetServer +exports.createClientSocket = createClientSocket diff --git a/lib/memkv.js b/lib/memkv.js new file mode 100644 index 0000000..75152fe --- /dev/null +++ b/lib/memkv.js @@ -0,0 +1,88 @@ +'use strict' + +const { match, extract, defaultParse, restoreUri } = require('./topic_pattern') +const errorCodes = require('./realm_error').errorCodes +const KeyValueStorageAbstract = require('./realm').KeyValueStorageAbstract + +class MemKeyValueStorage extends KeyValueStorageAbstract { + constructor () { + super() + this._keyDb = new Map() + } + + getKey (uri, cbRow) { + return new Promise((resolve, reject) => { + for (let [key, item] of this._keyDb) { + const aKey = defaultParse(key) + if (match(aKey, uri)) { + cbRow(aKey, item[1] /* data */) + } + } + resolve() + }) + } + + setKeyActor (actor) { + const opt = actor.getOpt() + const suri = restoreUri(extract(actor.getUri(), this.getUriPattern())) + const data = actor.getData() + + const pub = () => { + if (actor) { + this.pubActor(actor) + } + if (data === null) { + this._keyDb.delete(suri) + } else { + this._keyDb.set(suri, [actor.getSid(), data]) + } + } + const row = this._keyDb.get(suri) + + if ('when' in opt) { + if (opt.when === null) { + + if (undefined === row) { + pub() + return + } + const [sid, val, when] = row + + if (val === null) { + pub() + return + } + + actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Found value is not empty') + return + } + + if (undefined !== row) { + const [sid, val, when] = row + if (val !== null) { + pub() + return + } + } + actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Value is empty') + return + } + + pub() + } + + removeSession (sessionId) { + let toRemove = [] + for (let key in this._keyDb) { + const keySessionId = this._keyDb.get(key)[0] + if (keySessionId === sessionId) { + toRemove.push(key) + } + } + for (let i = 0; i < toRemove.length; i++) { + this.setKeyData(toRemove[i], null) + } + } +} + +exports.MemKeyValueStorage = MemKeyValueStorage diff --git a/lib/mqtt/gate.js b/lib/mqtt/gate.js index fd05843..a767e55 100644 --- a/lib/mqtt/gate.js +++ b/lib/mqtt/gate.js @@ -3,6 +3,7 @@ const { SESSION_WARNING } = require('../messages') const BaseGate = require('../base_gate') const RealmError = require('../realm_error').RealmError +const errorCodes = require('../realm_error').errorCodes const { mqttParse, restoreMqttUri } = require('../topic_pattern') const Context = require('../context') @@ -80,17 +81,19 @@ class MqttContext extends Context { session.setLastPublishedId(cmd.qid) let payload = '' + if (cmd.data === null) { + payload = Buffer.alloc(0) + } else if (cmd.data.payload !== undefined) { payload = cmd.data.payload } else - if (cmd.data.args !== undefined) { - payload = Buffer.from(JSON.stringify(cmd.data.args)) - } else - if (cmd.data.kwargs !== undefined) { - payload = Buffer.from(JSON.stringify(cmd.data.kwargs)) - } else if (cmd.data.kv !== undefined) { payload = Buffer.from(JSON.stringify(cmd.data.kv)) + } else + if (cmd.data.args instanceof Array && cmd.data.args.length === 0 && cmd.data.kwargs !== undefined) { + payload = Buffer.from(JSON.stringify(cmd.data.kwargs)) + } else { + payload = Buffer.from(JSON.stringify(cmd.data)) } this.mqttSend({ topic: restoreMqttUri(cmd.uri), @@ -113,14 +116,14 @@ class MqttContext extends Context { this.sender.close(code, reason) } - error (id, code, msg) { - if (undefined === id) { - throw new RealmError(this.getId(), code, msg) + sendError (cmd, code, text) { + if (undefined === cmd.id) { + throw new RealmError(this.getId(), code, text) } else { - // subscribe mode + // subscribe error mode const pkg = this.getId() - pkg.granted[id] = 0x80 + pkg.granted[cmd.id] = 0x80 this.mqttSubscribeDone() } } @@ -146,7 +149,7 @@ class MqttGate extends BaseGate { checkRealm (session, requestId) { if (!session.realm) { - throw new RealmError(requestId, 'wamp.error.not_authorized') + throw new RealmError(requestId, errorCodes.ERROR_NOT_AUTHORIZED) } } @@ -176,7 +179,7 @@ class MqttGate extends BaseGate { connect (ctx, session, message) { let result if (typeof message.username === 'string') { - result = message.username.match(/(.*)@([a-z0-9]*)$/i) + result = message.username.match(/(.*)@([a-zA-Z0-9-]*)$/i) } session.secureDetails = {} if (result) { @@ -220,22 +223,20 @@ class MqttGate extends BaseGate { opt.trace = true } - let uri = mqttParse(message.topic) - this.checkAuthorize(ctx, 'publish', uri, message.messageId) - - let data - if (message.payload.length === 0) { - data = null - } else { - data = { payload: message.payload } - } - return { + const cmd = { mtype: 'publish', - uri, - data, + uri: mqttParse(message.topic), + data: (message.payload.length === 0 + ? null + : { payload: message.payload } + ), id: message.messageId, opt - } + } + if (this.checkAuthorize(ctx, cmd, 'publish')) { + return cmd + } + return false } handle (ctx, session, msg) { @@ -254,8 +255,8 @@ class MqttGate extends BaseGate { handlers[mtype].call(this, ctx, session, msg) } catch (err) { if (err instanceof RealmError) { - ctx.mqttClose(1003, err.message) console.log(err) + ctx.mqttClose(1003, err.message) } else { throw err } @@ -279,7 +280,7 @@ handlers.connect = function (ctx, session, message) { handlers.disconnect = function (ctx, session, message) { // do not send WILL notification at correct disconnect session.cleanDisconnectPublish() - return false + ctx.mqttClose(1000, 'Server closed session') } handlers.publish = function (ctx, session, message) { @@ -305,10 +306,8 @@ handlers.puback = function (ctx, session, message) { id: qid }) if (session.secureDetails && session.secureDetails.clientId) { - session.realm.setKey( + session.realm.setKeyData( ['$FOX', 'clientOffset', session.secureDetails.clientId], - 0, // session.sessionId, - qid, qid ) } @@ -321,7 +320,7 @@ handlers.pingreq = function (ctx, session, message) { handlers.subscribe = function (ctx, session, message) { this.checkRealm(session) - let pkg = { + const pkg = { id: message.messageId, granted: [], count: message.subscriptions.length @@ -329,26 +328,27 @@ handlers.subscribe = function (ctx, session, message) { ctx.setId(pkg) const afterId = session.getLastPublishedId() for (let index=0; index < message.subscriptions.length; index++) { - let qos = Math.min(message.subscriptions[index].qos, 1) + const qos = Math.min(message.subscriptions[index].qos, 1) pkg.granted[index] = qos - let uri = mqttParse(message.subscriptions[index].topic) - if (this.checkAuthorize(ctx, 'subscribe', uri, index)) { - let opt = {} - if (qos > 0) { - opt.keepTraceFlag = true - } - if (afterId) { - opt.after = afterId - } - if (message.retain) { - opt.retained = true - } - session.realm.doTrace(ctx, { - mtype: 'subscribe', - id: index, - uri, - opt - }) + const uri = mqttParse(message.subscriptions[index].topic) + const opt = {} + if (qos > 0) { + opt.keepTraceFlag = true + } + if (afterId) { + opt.after = afterId + } + if (message.retain) { + opt.retained = true + } + const cmd = { + mtype: 'subscribe', + id: index, + uri, + opt + } + if (this.checkAuthorize(ctx, cmd, 'subscribe')) { + session.realm.doTrace(ctx, cmd) } } } diff --git a/lib/mqtt/transport.js b/lib/mqtt/transport.js index 09ed96e..6b840b0 100644 --- a/lib/mqtt/transport.js +++ b/lib/mqtt/transport.js @@ -10,21 +10,21 @@ function MqttSender (socket, session, router) { socket.write(generate(data)) } - this.close = function (code, reason) { + this.close = function () { socket.end() } } function MqttServer (gate, options) { - let router = gate.getRouter() - let _server = net.Server(function (socket) { - let session = gate.createSession() - let sender = new MqttSender(socket, session, router) + const router = gate.getRouter() + const _server = net.Server(function (socket) { + const session = gate.createSession() + const sender = new MqttSender(socket, session, router) - let parser = ParserBuild() + const parser = ParserBuild() parser.on('packet', function (data) { - let ctx = gate.createContext(session, sender) + const ctx = gate.createContext(session, sender) router.emit('session.Rx', session, data) session.handle(ctx, data) }) diff --git a/lib/realm.js b/lib/realm.js index 7c2cc94..85c6b59 100644 --- a/lib/realm.js +++ b/lib/realm.js @@ -6,7 +6,7 @@ const EventEmitter = require('events').EventEmitter const { SESSION_JOIN, SESSION_LEAVE, RESULT_EMIT, ON_SUBSCRIBED, ON_UNSUBSCRIBED, ON_REGISTERED, ON_UNREGISTERED } = require('./messages') -const { match, intersect, merge, extract, defaultParse, restoreUri } = require('./topic_pattern') +const { match, intersect, merge, extract, restoreUri } = require('./topic_pattern') const errorCodes = require('./realm_error').errorCodes const RealmError = require('./realm_error').RealmError const WampApi = require('./wamp/api') @@ -41,7 +41,11 @@ class Actor { } acknowledged () { - this.ctx.acknowledged(this.msg) + return this.ctx.acknowledged(this.msg) + } + + reject (errorCode, text) { + return this.ctx.sendError(this.msg, errorCode, text) } getSid () { @@ -94,7 +98,7 @@ class ActorCall extends Actor { rsp: actor.msg.rqt }) - let subD = this.getRegistration() + const subD = this.getRegistration() subD.taskResolved() if (subD.isAble()) { this.engine.checkTasks(subD) @@ -181,6 +185,11 @@ class ActorReg extends ActorTrace { } class ActorPush extends Actor { + constructor (ctx, msg) { + super(ctx, msg) + this.clientNotified = false + } + setEventId (eventId) { this.eventId = eventId } @@ -236,7 +245,7 @@ class DeferMap { } getDefer (sid, markId) { - let result = this.defers.get(markId) + const result = this.defers.get(markId) if (result && result.destSID.hasOwnProperty(sid)) { return result } else { @@ -245,7 +254,7 @@ class DeferMap { } doneDefer (sid, markId) { - let found = this.defers.get(markId) + const found = this.defers.get(markId) if (found && found.destSID.hasOwnProperty(sid)) { this.defers.delete(markId) } @@ -269,11 +278,6 @@ class BaseEngine { this.wTrace = new Qlobber() // [uri][subscription] this.qConf = new DeferMap() - this.kvo = [] // key value order - } - - registerKeyValueEngine (uri, kv) { - this.kvo.push({ uri, kv }) } getSubStack (uri) { @@ -335,11 +339,11 @@ class BaseEngine { } doCall (taskD) { - let strUri = restoreUri(taskD.getUri()) - let queue = this.getSubStack(strUri) + const strUri = restoreUri(taskD.getUri()) + const queue = this.getSubStack(strUri) let subExists = false for (let index in queue) { - let subD = queue[index] + const subD = queue[index] subExists = true if (subD.isAble()) { subD.callWorker(taskD) @@ -350,7 +354,7 @@ class BaseEngine { if (!subExists) { throw new RealmError(taskD.msg.id, errorCodes.ERROR_NO_SUCH_PROCEDURE, - ['no callee registered for procedure <' + strUri + '>'] + 'no callee registered for procedure <' + strUri + '>' ) } @@ -394,20 +398,6 @@ class BaseEngine { actor.traceStarted = true actor.flushDelayStack() } - - if (actor.getOpt().retained) { - this.getKey( - actor.getUri(), - (key, data) => { - actor.sendEvent({ - qid: null, - uri: key, - data: data, - opt: { retained: true } - }) - } - ) - } } actorPush (subD, event) { @@ -429,7 +419,6 @@ class BaseEngine { // If supported by the Broker, this behavior can be overridden via the option exclude_me set to false. dispatch (actor) { actor.destSID = {} - actor.clientNotified = false let found = this.matchTrace(actor.getUri()) for (let i = 0; i < found.length; i++) { @@ -454,40 +443,6 @@ class BaseEngine { this.actorConfirm(actor, cmd) } - setKey (uri, sessionId, data, messageId) { - for (let i = this.kvo.length - 1; i >= 0; i--) { - const kvr = this.kvo[i] - if (match(uri, kvr.uri)) { - kvr.kv.setKey(extract(uri,kvr.uri), sessionId, data, messageId) - break - } - } - } - - removeSession (sessionId) { - for (let i = this.kvo.length - 1; i >= 0; i--) { - this.kvo[i].kv.removeSession(sessionId) - } - } - - getKey (uri, cbRow) { - const done = [] - for (let i = this.kvo.length - 1; i >= 0; i--) { - const kvr = this.kvo[i] - // console.log('MATCH++', uri, kvr.uri, extract(uri, kvr.uri)) - if (intersect(uri, kvr.uri)) { - done.push(kvr.kv.getKey( - extract(uri, kvr.uri), - (aKey, data, messageId) => { - // console.log('GOT', aKey, data) - cbRow(merge(aKey, kvr.uri), data, messageId) - } - )) - } - } - return Promise.all(done) - } - getHistoryAfter (after, uri, cbRow) { return new Promise(() => {}) } } @@ -501,6 +456,7 @@ class BaseRealm extends EventEmitter { this._router = router this._realmName = realmName this.engine = engine + this.kvo = [] // key value order } getRouter () { @@ -515,7 +471,7 @@ class BaseRealm extends EventEmitter { createActorPush (ctx, cmd) { return new ActorPush (ctx, cmd) } doEcho (ctx, cmd) { - let a = this.createActorEcho(ctx, cmd) + const a = this.createActorEcho(ctx, cmd) a.acknowledged() } @@ -605,6 +561,19 @@ class BaseRealm extends EventEmitter { session.addTrace(cmd.qid, subscription) this.engine.doTrace(subscription) + if (subscription.getOpt().retained) { + this.getKey( + subscription.getUri(), + (key, data) => { + subscription.sendEvent({ + qid: null, + uri: key, + data: data, + opt: { retained: true } + }) + } + ) + } this.emit(ON_SUBSCRIBED, subscription) return cmd.qid @@ -625,14 +594,10 @@ class BaseRealm extends EventEmitter { doPush (ctx, cmd) { const actor = this.createActorPush(ctx, cmd) - this.engine.doPush(actor) - - if (cmd.opt && cmd.opt.hasOwnProperty('retain')) { - let sessionId = 0 - if (cmd.opt.hasOwnProperty('weak')) { - sessionId = actor.getSid() - } - this.engine.setKey(cmd.uri, sessionId, cmd.data, actor.eventId) + if (cmd.opt && cmd.opt.retain) { + this.setKeyActor(actor) + } else { + this.engine.doPush(actor) } } @@ -653,7 +618,11 @@ class BaseRealm extends EventEmitter { this.emit(SESSION_LEAVE, session) session.cleanupTrace(this.engine) session.cleanupReg(this.engine) - this.engine.removeSession(session.sessionId) + + for (let i = this.kvo.length - 1; i >= 0; i--) { + this.kvo[i].kv.removeSession(session.sessionId) + } + this._sessions.delete(session.sessionId) session.setRealm(null) } @@ -663,8 +632,8 @@ class BaseRealm extends EventEmitter { } getSessionIds () { - var result = [] - for (var [sId, session] of this._sessions) { + let result = [] + for (let [sId, session] of this._sessions) { result.push(session.sessionId) } return result @@ -695,65 +664,125 @@ class BaseRealm extends EventEmitter { } registerKeyValueEngine (uri, kv) { - this.engine.registerKeyValueEngine(uri, kv) + kv.setUriPattern(uri) + kv.pubActor = (actor) => { + this.engine.doPush(actor) + } + kv.confirm = (actor, cmd) => { + actor.confirm(cmd) + } + this.kvo.push({ uri, kv }) } - setKey (uri, sessionId, data, messageId) { - this.engine.setKey(uri, sessionId, data, messageId) + setKeyData (uri, data) { + for (let i = this.kvo.length - 1; i >= 0; i--) { + const kvr = this.kvo[i] + if (match(uri, kvr.uri)) { + kvr.kv.setKeyData(extract(uri, kvr.uri), data) + break + } + } + } + + setKeyActor (actor) { + const uri = actor.getUri() + for (let i = this.kvo.length - 1; i >= 0; i--) { + const kvr = this.kvo[i] + if (match(uri, kvr.uri)) { + kvr.kv.setKeyActor(actor) + break + } + } } getKey (uri, cbRow) { - return this.engine.getKey(uri, cbRow) + const done = [] + for (let i = this.kvo.length - 1; i >= 0; i--) { + const kvr = this.kvo[i] + // console.log('MATCH++', uri, kvr.uri, extract(uri, kvr.uri)) + if (intersect(uri, kvr.uri)) { + done.push(kvr.kv.getKey( + extract(uri, kvr.uri), + (aKey, data) => { + cbRow(merge(aKey, kvr.uri), data) + } + )) + } + } + return Promise.all(done) } } -class KeyValueStorageAbstract { - // Promise:getKey (uri, cbRow) ::cbRow:: aKey, data - // removeSession (sessionId) +class ActorPushKv { + constructor (uri, data, opt) { + this.uri = uri + this.data = data + this.opt = opt + this.eventId = null + } + + getSid () { + return null + } + + getOpt () { + return this.opt + } + + getUri () { + return this.uri + } + + getData () { + return this.data + } + + setEventId (id) { + this.eventId = id + } + + getEvent () { + return { + qid: this.eventId, + uri: this.getUri(), + data: this.getData(), + opt: this.getOpt() + } + } + + confirm () {} } -class MemKeyValueStorage extends KeyValueStorageAbstract { +class KeyValueStorageAbstract { constructor () { - super() - this._keyDb = new Map() + this.uriPattern = '#' } - getKey (uri, cbRow) { - return new Promise((resolve, reject) => { - for (let [key, item] of this._keyDb) { - const aKey = defaultParse(key) - if (match(aKey, uri)) { - cbRow(aKey, item[1] /* data */) - } - } - resolve() - }) + setUriPattern (uriPattern) { + this.uriPattern = uriPattern } - setKey (uri, sessionId, data, messageId) { - if (data === null) { - this.removeKey(uri) - } else { - this._keyDb.set(restoreUri(uri), [sessionId, data, messageId]) - } + getUriPattern () { + return this.uriPattern } - removeKey (key) { - this._keyDb.delete(restoreUri(key)) + createActorPushKv (uri, data, opt) { + return new ActorPushKv(uri, data, opt) } - removeSession (sessionId) { - let toRemove = [] - for (let key in this._keyDb) { - const keySessionId = this._keyDb.get(key)[0] - if (keySessionId === sessionId) { - toRemove.push(key) - } - } - for (let i = 0; i < toRemove.length; i++) { - this.removeKey(toRemove[i]) - } + // pubActor (actor) virtual abstract + + setKeyData (key, data) { + this.setKeyActor( + this.createActorPushKv( + merge(key, this.uriPattern), + data, + {}) + ) } + + // Promise:getKey (uri, cbRow) ::cbRow:: aKey, data + // removeSession (sessionId) } class MemEngine extends BaseEngine { @@ -763,6 +792,11 @@ class MemEngine extends BaseEngine { this._messages = [] } + dispatch (actor) { + // event.qid = ++this._messageGen + return super.dispatch(actor) + } + doPush (actor) { actor.setEventId(++this._messageGen) super.doPush(actor) @@ -793,8 +827,8 @@ exports.ActorCall = ActorCall exports.ActorTrace = ActorTrace exports.ActorPush = ActorPush +exports.ActorPushKv = ActorPushKv exports.KeyValueStorageAbstract = KeyValueStorageAbstract -exports.MemKeyValueStorage = MemKeyValueStorage exports.BaseEngine = BaseEngine exports.MemEngine = MemEngine exports.BaseRealm = BaseRealm diff --git a/lib/realm_error.js b/lib/realm_error.js index f945e37..ecfe916 100644 --- a/lib/realm_error.js +++ b/lib/realm_error.js @@ -7,7 +7,12 @@ let errorCodes = { // body queue error codes ERROR_HEADER_IS_NOT_COMPLETED : 107, - ERROR_AUTHORIZATION_FAILED : 108 + ERROR_AUTHORIZATION_FAILED : 108, + ERROR_NOT_AUTHORIZED : 109, + + ERROR_INVALID_PAYLOAD : 110, + ERROR_INVALID_URI : 111, + ERROR_INVALID_ARGUMENT : 112 } class RealmError extends Error { diff --git a/lib/router.js b/lib/router.js index 71cd306..5598f4b 100644 --- a/lib/router.js +++ b/lib/router.js @@ -6,7 +6,7 @@ const { REALM_CREATED, SESSION_TX, SESSION_RX, SESSION_WARNING } = require('./me const tools = require('./tools') const BaseRealm = require('./realm').BaseRealm const MemEngine = require('./realm').MemEngine -const MemKeyValueStorage = require('./realm').MemKeyValueStorage +const MemKeyValueStorage = require('./memkv').MemKeyValueStorage class Router extends EventEmitter { constructor () { @@ -71,8 +71,9 @@ class Router extends EventEmitter { createRealm (realmName) { const engine = new MemEngine() - engine.registerKeyValueEngine(['#'], new MemKeyValueStorage()) - return new BaseRealm(this, realmName, engine) + const realm = new BaseRealm(this, realmName, engine) + realm.registerKeyValueEngine(['#'], new MemKeyValueStorage()) + return realm } getRealm (realmName, callback) { diff --git a/lib/tools.js b/lib/tools.js index ffb883e..b040d5c 100644 --- a/lib/tools.js +++ b/lib/tools.js @@ -1,3 +1,28 @@ -module.exports.randomId = function () { - return require('crypto').randomBytes(6).readUIntBE(0, 6) +const crypto = require('crypto') + +function randomId () { + return crypto.randomBytes(6).readUIntBE(0, 6) } + +function keyDate (date) { + const month = date.getUTCMonth() + 1 + const day = date.getUTCDate() + const hour = date.getUTCHours() + const minutes = date.getUTCMinutes() + + return date.getUTCFullYear().toString().substr(-2) + + (month < 10 ? '0' + month : month) + + (day < 10 ? '0' + day : day) + + (hour < 10 ? '0' + hour : hour) + + (minutes < 10 ? '0' + minutes : minutes) +} + +function keyId (id) { + const idStr = id.toString(36) + return String.fromCharCode(idStr.length + 96) + + idStr +} + +module.exports.randomId = randomId +module.exports.keyDate = keyDate +module.exports.keyId = keyId diff --git a/lib/wamp/api.js b/lib/wamp/api.js index 7dc3486..04b35d6 100644 --- a/lib/wamp/api.js +++ b/lib/wamp/api.js @@ -10,7 +10,7 @@ const Context = require('../context') class WampApiContext extends Context { sendInvoke (cmd) { - let [args, kwargs] = dparse(cmd.data) + const [args, kwargs] = dparse(cmd.data) cmd.id(cmd.qid, args, kwargs, cmd.opt) } @@ -19,12 +19,12 @@ class WampApiContext extends Context { if (cmd.rsp === RESULT_EMIT) { resOpt.progress = true } - let [args, kwargs] = dparse(cmd.data) + const [args, kwargs] = dparse(cmd.data) cmd.id(cmd.err, args, kwargs, resOpt) } sendEvent (cmd) { - let [args, kwargs] = dparse(cmd.data) + const [args, kwargs] = dparse(cmd.data) cmd.id.cb(cmd.qid, args, kwargs, cmd.opt) } @@ -34,6 +34,12 @@ class WampApiContext extends Context { cmd.id.resolve(cmd.qid) } } + + sendError (cmd, code, text) { + if (cmd.id && cmd.id.reject) { + cmd.id.reject({ error: code, message: text }) + } + } } function WampApi (realm) { diff --git a/lib/wamp/dparse.js b/lib/wamp/dparse.js index c69d7a6..75a301d 100644 --- a/lib/wamp/dparse.js +++ b/lib/wamp/dparse.js @@ -3,23 +3,25 @@ // Manage args + kwargs module.exports = function (data) { let args, kwargs + if (data === null) { + args = [] + kwargs = undefined + } else if (data.args !== undefined) { args = data.args kwargs = data.kwargs } else if (data.payload !== undefined) { - let payload = JSON.parse(data.payload) if (data instanceof Array) { - args = payload - kwargs = {} - } - else { + args = JSON.parse(data.payload) + kwargs = undefined + } else { args = [] - kwargs = payload + kwargs = JSON.parse(data.payload) } } else { - args = [] // args + args = [] kwargs = data.kv } return [args, kwargs] diff --git a/lib/wamp/gate.js b/lib/wamp/gate.js index a5a02e8..6ce8a4d 100644 --- a/lib/wamp/gate.js +++ b/lib/wamp/gate.js @@ -2,23 +2,13 @@ const WAMP = require('./protocol') const dparse = require('./dparse') +const errorMessages = require('./msg').errorMessages const { wampParse, restoreUri } = require('../topic_pattern') const { RESULT_EMIT, RESULT_OK } = require('../messages') const BaseGate = require('../base_gate') const RealmError = require('../realm_error').RealmError -const errorCodes = require('../realm_error').errorCodes const Context = require('../context') -let errorMessages = {} - -// A Dealer could not perform a call, since not procedure is currently registered under the given URI. -errorMessages[errorCodes.ERROR_NO_SUCH_PROCEDURE] = 'wamp.error.no_such_procedure' - -// A Dealer could not perform a unregister, since the given registration is not active. -errorMessages[errorCodes.ERROR_NO_SUCH_REGISTRATION] = 'wamp.error.no_such_registration' - -errorMessages[errorCodes.ERROR_AUTHORIZATION_FAILED] = 'wamp.error.authorization_failed' - let handlers = {} let cmdAck = {} @@ -80,7 +70,11 @@ class WampContext extends Context { cmdAck[cmd.wtype].call(this, cmd) } - wampSendError (cmd, requestId, errorCode, args) { + sendError (cmd, errorCode, text) { + return this.wampSendError(cmd.wtype, cmd.id, errorCode, text) + } + + wampSendError (mtype, requestId, errorCode, text) { if (requestId) { // do not send on disconnect let wampCode if (errorMessages[errorCode]) { @@ -89,9 +83,9 @@ class WampContext extends Context { wampCode = errorCode } - var msg = [WAMP.ERROR, cmd, requestId, {}, wampCode] - if (args) { - msg.push(args) + var msg = [WAMP.ERROR, mtype, requestId, {}, wampCode] + if (text) { + msg.push([text]) } this.wampSend(msg) @@ -247,7 +241,7 @@ class WampGate extends BaseGate { handlers[mtype].call(this, ctx, session, msg) } catch (err) { if (err instanceof RealmError) { - ctx.wampSendError(mtype, err.requestId, err.code, [err.message]) + ctx.wampSendError(mtype, err.requestId, err.code, err.message) } else { throw err } @@ -260,8 +254,8 @@ class WampGate extends BaseGate { } handlers[WAMP.HELLO] = function (ctx, session, message) { - let realmName = message.shift() - let details = message.shift() + const realmName = message.shift() + const details = message.shift() if (session.realm === null) { this.hello(ctx, session, realmName, details) } else { @@ -271,8 +265,8 @@ handlers[WAMP.HELLO] = function (ctx, session, message) { } handlers[WAMP.AUTHENTICATE] = function (ctx, session, message) { - let secret = message.shift() - let extra = message.shift() + const secret = message.shift() + const extra = message.shift() if (session.realm === null) { this.authenticate(ctx, session, secret, extra) } else { @@ -291,7 +285,7 @@ handlers[WAMP.REGISTER] = function (ctx, session, message) { ctx.setId(id) this.checkRealm(session, id) -// this.checkAuthorize(ctx, 'register', uri) +// this.checkAuthorize(ctx, cmd, 'register') let opt = {} if (Number.isInteger(inopt.concurrency)) { @@ -319,12 +313,20 @@ handlers[WAMP.CALL] = function (ctx, session, message) { var opt = message.shift() || {} var uri = wampParse(message.shift()) var args = message.shift() || [] - var kwargs = message.shift() || {} + var kwargs = message.shift() || null ctx.setId(id) this.checkRealm(session, id) - // this.checkAuthorize(ctx, 'call', uri) - let cmd = { id, uri, opt: {}, data: { args, kwargs } } + // this.checkAuthorize(ctx, cmd, 'call') + let cmd = { + id, + uri, + opt: {}, + data: (kwargs === null && args instanceof Array && args.length === 0 + ? null + : { args, kwargs } + ) + } if (opt.receive_progress) { cmd.opt.receive_progress = true @@ -365,19 +367,21 @@ handlers[WAMP.YIELD] = function (ctx, session, message) { } handlers[WAMP.SUBSCRIBE] = function (ctx, session, message) { - let id = message.shift() - let opt = message.shift() - let uri = wampParse(message.shift()) + const id = message.shift() + const opt = message.shift() + const uri = wampParse(message.shift()) ctx.setId(id) this.checkRealm(session, id) - this.checkAuthorize(ctx, 'subscribe', uri) - session.realm.doTrace(ctx, { + const cmd = { wtype: WAMP.SUBSCRIBE, id, uri, opt - }) + } + if (this.checkAuthorize(ctx, cmd, 'subscribe')) { + session.realm.doTrace(ctx, cmd) + } } cmdAck[WAMP.SUBSCRIBE] = function (cmd) { @@ -399,19 +403,22 @@ cmdAck[WAMP.UNSUBSCRIBE] = function (cmd) { } handlers[WAMP.PUBLISH] = function (ctx, session, message) { - let id = message.shift() - let opt = message.shift() || {} - let uri = wampParse(message.shift()) - let args = message.shift() || [] - let kwargs = message.shift() || {} + const id = message.shift() + const opt = message.shift() || {} + const uri = wampParse(message.shift()) + const args = message.shift() || [] + const kwargs = message.shift() || null ctx.setId(id) - let cmd = { + const cmd = { wtype: WAMP.PUBLISH, id, uri, - data: { args, kwargs } + data: (kwargs === null && args instanceof Array && args.length === 0 + ? null + : { args, kwargs } + ) } if (opt.acknowledge) { @@ -426,8 +433,9 @@ handlers[WAMP.PUBLISH] = function (ctx, session, message) { cmd.opt = opt this.checkRealm(session, id) - this.checkAuthorize(ctx, 'publish', uri) - session.realm.doPush(ctx, cmd) + if (this.checkAuthorize(ctx, cmd, 'publish')) { + session.realm.doPush(ctx, cmd) + } } cmdAck[WAMP.PUBLISH] = function (cmd) { diff --git a/lib/wamp/msg.js b/lib/wamp/msg.js new file mode 100644 index 0000000..983681e --- /dev/null +++ b/lib/wamp/msg.js @@ -0,0 +1,24 @@ +'use strict' + +const errorCodes = require('../realm_error').errorCodes + +const errorMessages = {} + +// A Dealer could not perform a call, since not procedure is currently registered under the given URI. +errorMessages[errorCodes.ERROR_NO_SUCH_PROCEDURE] = 'wamp.error.no_such_procedure' + +// A Dealer could not perform a unregister, since the given registration is not active. +errorMessages[errorCodes.ERROR_NO_SUCH_REGISTRATION] = 'wamp.error.no_such_registration' + +errorMessages[errorCodes.ERROR_AUTHORIZATION_FAILED] = 'wamp.error.authorization_failed' +errorMessages[errorCodes.ERROR_NOT_AUTHORIZED] = 'wamp.error.not_authorized' +errorMessages[errorCodes.ERROR_INVALID_PAYLOAD] = 'wamp.error.invalid_payload' +errorMessages[errorCodes.ERROR_INVALID_URI] = 'wamp.error.invalid_uri' +errorMessages[errorCodes.ERROR_INVALID_ARGUMENT] = 'wamp.error.invalid_argument' + +// wamp.error.authentication_failed +// wamp.error.no_such_role +// wamp.error.no_such_realm +// wamp.error.no_auth_method + +exports.errorMessages = errorMessages diff --git a/package-lock.json b/package-lock.json index aa79efa..48b3f47 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.1", + "version": "0.7.2", "lockfileVersion": 1, "requires": true, "dependencies": { @@ -472,6 +472,11 @@ "integrity": "sha1-Cr9PHKpbyx96nYrMbepPqqBLrJs=", "dev": true }, + "event-lite": { + "version": "0.1.2", + "resolved": "https://registry.npmjs.org/event-lite/-/event-lite-0.1.2.tgz", + "integrity": "sha512-HnSYx1BsJ87/p6swwzv+2v6B4X+uxUteoDfRxsAb1S1BePzQqOLevVmkdA15GHJVd9A9Ok6wygUR18Hu0YeV9g==" + }, "external-editor": { "version": "3.0.3", "resolved": "https://registry.npmjs.org/external-editor/-/external-editor-3.0.3.tgz", @@ -589,6 +594,11 @@ "safer-buffer": ">= 2.1.2 < 3" } }, + "ieee754": { + "version": "1.1.13", + "resolved": "https://registry.npmjs.org/ieee754/-/ieee754-1.1.13.tgz", + "integrity": "sha512-4vf7I2LYV/HaWerSo3XmlMkp5eZ83i+/CDluXi/IGTs/O1sejBNhTtnxzmRZfvOUqj7lZjqHkeTvpgSFDlWZTg==" + }, "ignore": { "version": "4.0.6", "resolved": "https://registry.npmjs.org/ignore/-/ignore-4.0.6.tgz", @@ -664,6 +674,11 @@ } } }, + "int64-buffer": { + "version": "0.1.10", + "resolved": "https://registry.npmjs.org/int64-buffer/-/int64-buffer-0.1.10.tgz", + "integrity": "sha1-J3siiofZWtd30HwTgyAiQGpHNCM=" + }, "is-fullwidth-code-point": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/is-fullwidth-code-point/-/is-fullwidth-code-point-2.0.0.tgz", @@ -792,6 +807,17 @@ "integrity": "sha512-tgp+dl5cGk28utYktBsrFqA7HKgrhgPsg6Z/EfhWI4gl1Hwq8B/GmY/0oXZ6nF8hDVesS/FpnYaD/kOWhYQvyg==", "dev": true }, + "msgpack-lite": { + "version": "0.1.26", + "resolved": "https://registry.npmjs.org/msgpack-lite/-/msgpack-lite-0.1.26.tgz", + "integrity": "sha1-3TxQsm8FnyXn7e42REGDWOKprYk=", + "requires": { + "event-lite": "^0.1.1", + "ieee754": "^1.1.8", + "int64-buffer": "^0.1.9", + "isarray": "^1.0.0" + } + }, "msgpack5": { "version": "4.2.1", "resolved": "https://registry.npmjs.org/msgpack5/-/msgpack5-4.2.1.tgz", diff --git a/package.json b/package.json index bc1cdfe..420d3a3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.1", + "version": "0.7.2", "description": "Web Application Message Router/Server WAMP/MQTT", "author": { "name": "Anatoly Tsapkov", @@ -26,6 +26,7 @@ "jsonschema": "^1.2.5", "mqtt-packet": "^6.3.0", "node-statsd": "*", + "msgpack-lite": "*", "qlobber": "^3.1.0", "ws": "^6.2.1" }, diff --git a/test/mqtt.js b/test/mqtt.js index 50b0798..0fb49b6 100644 --- a/test/mqtt.js +++ b/test/mqtt.js @@ -24,7 +24,7 @@ describe('mqtt-realm', function () { realm = router.createRealm('test-realm') api = realm.wampApi() - let mqttGate = new MqttGate(router) + const mqttGate = new MqttGate(router) cli = mqttGate.createSession() ctx = mqttGate.createContext(cli, sender) @@ -323,11 +323,9 @@ describe('mqtt-realm', function () { it('at-connection-fail-will-publish', function (done) { realm.cleanupSession(cli) - router.getRealm = (realmName, cb) => {cb(realm)} + router.getRealm = (realmName, cb) => { cb(realm) } - sender.send = chai.spy( - function (msg, callback) {} - ) + sender.send = chai.spy(() => {}) cli.handle(ctx, { cmd: 'connect', retain: false, @@ -339,7 +337,7 @@ describe('mqtt-realm', function () { will: { retain: false, qos: 0, - topic: 'disconnect', + topic: 'topic-test-disconnect', payload: Buffer.from('{"text":"some-test-text"}') } }) @@ -347,24 +345,26 @@ describe('mqtt-realm', function () { var subSpy = chai.spy( function (publicationId, args, kwargs) { expect(args).to.deep.equal([]) - expect(kwargs).to.deep.equal({ text: "some-test-text" }) + expect(kwargs).to.deep.equal({ text: 'some-test-text' }) } ) - api.subscribe('disconnect', subSpy).then((subId) => { + api.subscribe('topic-test-disconnect', subSpy).then((subId) => { cli.cleanup() expect(subSpy).to.have.been.called.once() done() }) }) -/* it('connect-clientid', function () { + it('connect-clientid', function (done) { realm.cleanupSession(cli) - router.getRealm = (realmName, cb) => {cb(realm)} + router.getRealm = (realmName, cb) => { cb(realm) } let i = 0 - sender.send = chai.spy((msg, callback) => { + sender.send = chai.spy((msg) => { sender.send = nextPublish - console.log('TEST-MSG', ++i, msg) + + expect(msg.cmd).to.equal('connack') + expect(msg.sessionPresent).to.equal(false) cli.handle(ctx, { cmd: 'subscribe', @@ -379,29 +379,47 @@ describe('mqtt-realm', function () { }) }) + let pubMsgId + const nextPublish = chai.spy((msg) => { - sender.send = nextConnect - console.log('TEST-MSG', ++i, msg) + sender.send = nextPuback + + expect(msg.cmd).to.equal('suback') + pubMsgId = msg.messageId api.publish('topic1', [], { data: 1 }, { trace: true }) expect(realm.engine._messages.length, 'trace message need to be saved').to.equal(1) + }) + + const nextPuback = chai.spy((msg) => { + sender.send = nextConnect + + expect(msg.cmd).to.equal('publish') + expect(msg.topic).to.equal('topic1') + expect(msg.qos).to.equal(1) + expect(msg.payload.toString()).to.equal('{"data":1}') cli.handle(ctx, { - cmd: 'disconnect', + cmd: 'puback', retain: false, qos: 0, dup: false, - length: 0, + length: 2, topic: null, - payload: null + payload: null, + messageId: pubMsgId }) - }) - const nextConnect = chai.spy((msg) => { - sender.send = nextConnack2 - console.log('TEST-MSG', ++i, msg) + const gotRow = chai.spy((key, value) => { + expect(key).to.deep.equal(['$FOX', 'clientOffset', 'agent-state']) + expect(value).to.equal(pubMsgId) + }) - // api.publish('topic1', [], { data: 2 }, { trace: true }) + realm.getKey(['$FOX', 'clientOffset', 'agent-state'], gotRow) + expect(gotRow).to.have.been.called.once() + + cli.cleanup() + api.publish('topic1', [], { data: 2 }, { trace: true }) cli.handle(ctx, { cmd: 'connect', @@ -412,14 +430,16 @@ describe('mqtt-realm', function () { topic: null, payload: null, clean: false, - username: 'user@realm', + username: 'user@test-realm', clientId: 'agent-state' }) }) - const nextConnack2 = chai.spy((msg) => { - sender.send = nextEventReceive - console.log('RTEST-MSG', ++i, msg) + const nextConnect = chai.spy((msg) => { + sender.send = nextSubReceive + + expect(msg.cmd).to.equal('connack') + expect(msg.sessionPresent).to.equal(true) cli.handle(ctx, { cmd: 'subscribe', @@ -434,8 +454,17 @@ describe('mqtt-realm', function () { }) }) - const nextEventReceive = chai.spy((msg) => { - console.log('NTEST-MSG', ++i, msg) + const nextSubReceive = chai.spy((msg) => { + sender.send = doneEventReceive + expect(msg.cmd).to.equal('suback') + }) + + const doneEventReceive = chai.spy((msg) => { + expect(msg.cmd).to.equal('publish') + expect(msg.topic).to.equal('topic1') +// expect(msg.qos).to.equal(1) + expect(msg.payload.toString()).to.equal('{"data":2}') + done() }) // START HERE @@ -448,9 +477,8 @@ describe('mqtt-realm', function () { topic: null, payload: null, clean: false, - username: 'user@realm', + username: 'user@test-realm', clientId: 'agent-state' }) - - }) */ + }) }) diff --git a/test/realm.js b/test/realm.js index e318d7b..42db00c 100644 --- a/test/realm.js +++ b/test/realm.js @@ -7,7 +7,7 @@ const expect = chai.expect const WAMP = require('../lib/wamp/protocol') const WampGate = require('../lib/wamp/gate') const Router = require('../lib/router') -const MemKeyValueStorage = require('../lib/realm').MemKeyValueStorage +const MemKeyValueStorage = require('../lib/memkv').MemKeyValueStorage chai.use(spies) @@ -53,7 +53,7 @@ describe('wamp-realm', function () { expect(msg[1]).to.equal(WAMP.CALL) expect(msg[2]).to.equal(1234) expect(msg[4]).to.equal('wamp.error.no_such_procedure') - expect(msg[5]).to.deep.equal([ 'no callee registered for procedure ' ]) + expect(msg[5]).to.deep.equal(['no callee registered for procedure ']) } ) cli.handle(ctx, [WAMP.CALL, 1234, {}, 'any.function.name', []]) @@ -431,9 +431,31 @@ describe('wamp-realm', function () { // console.log('key', realm.getKey('topic2')) }) + it('wamp-key-remove', function () { + cli.handle(ctx, [WAMP.PUBLISH, 1234, { retain: true }, 'topic2', [], { some: 'value' }]) + + const gotRow = chai.spy((key, value) => { + expect(key).to.deep.equal(['topic2']) + expect(value).to.deep.equal({ args: [], kwargs: { some: 'value' } }) + }) + + const noRow = chai.spy(() => {}) + + return realm.getKey(['topic2'], gotRow).then(() => { + expect(gotRow).to.have.been.called.exactly(1) + + // no kwargs sent if kwargs passed as null + cli.handle(ctx, [WAMP.PUBLISH, 1234, { retain: true }, 'topic2', []]) + + return realm.getKey(['topic2'], noRow) + }).then(() => { + expect(noRow, 'row need to be removed').to.have.not.been.called() + }) + }) + it('reduce-one', function () { sender.send = chai.spy((msg, callback) => { -// console.log('REDUCE-CALL', msg); + // console.log('REDUCE-CALL', msg); }) cli.handle(ctx, [WAMP.REGISTER, 1234, { reducer: true }, 'storage']) @@ -454,7 +476,7 @@ describe('wamp-realm', function () { expect(data).to.deep.equal({ args: [], kwargs: { fullName: 'John Doe' } }) }) app.getKey(['*', 'john'], row) - expect(row).to.have.been.called.exactly(1) + expect(row, 'data has to be saved').to.have.been.called.exactly(1) sender.send = chai.spy((msg, callback) => { if (msg[1] === WAMP.EVENT) {