diff --git a/.gitignore b/.gitignore index 28d1427..e058213 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ coverage/* .nyc_output .idea/ **/*.iml +dbfiles/* diff --git a/README.md b/README.md index 436d567..16cf5a5 100644 --- a/README.md +++ b/README.md @@ -84,13 +84,11 @@ register('reduce.the.key.#', (args, kwargs, options) => { {reducer: true}) ``` -## Retained Storage Roadmap -It is good to have some storage to keep last published message. The server -has to maintain persistence of keys and provide the value as immediate first -message for the subscription. And here what could be implemented +## Retained Storage +There is storage to keep last published message. The server maintains persistence of keys. The values are provided as immediate first message for the subscription if `retained` flag is pecified. ```javascript -publish('the.key', [ 'args' ], { kwArgs: true }, { +session.publish('the.key', [ 'args' ], { kwArgs: true }, { retain: true, when: { status: 'started' }, watch: false @@ -98,10 +96,15 @@ publish('the.key', [ 'args' ], { kwArgs: true }, { }) ``` -### Options Description +### Publish Options Description * retain: boolean, keep in Key Value storage. Default value is false that means message does not retain. * when: struct, publish only if the key meets requirements. null means that key should not be exists. -* watch: boolean, applicable if `when` option defined. Provide ability to wait the necesssary condition and do action immediately. If several clients waits for that the only one achieves acknowledge message. +* watch: boolean, applicable if `when` option defined. Provide ability to wait for the necesssary condition and then do action immediately. If several clients waits for same value the only one achieves acknowledge of message. +* will: value that will be assigned at the session disconnect. If the value is changed by another +process the will value is cleaned. + +### Subscribe Options +* retained: boolean, corresponding values from key value storage will be returned as immidiate events. ### Aggregate Engine for the data streams diff --git a/democli/resource-lock.js b/democli/resource-lock.js new file mode 100644 index 0000000..6db3d9c --- /dev/null +++ b/democli/resource-lock.js @@ -0,0 +1,96 @@ +// AUTOBAHN_DEBUG = true; +const autobahn = require('autobahn') +const program = require('commander') + +program + .option('-s, --server ', 'Server URI address', 'ws://127.0.0.1:9000/wamp') + .parse(process.argv) + +console.log('connect to server:', program.server) + +var user = 'joe' +var key = 'joe-secret' + +// this callback is fired during authentication +function onchallenge (session, method, extra) { + if (method === 'ticket') { + return key + } + else if (method === 'wampcra') { + return autobahn.auth_cra.sign(key, extra.challenge) + } + else { + throw Error("don't know how to authenticate using '" + method + "'") + } +} + +const connection = new autobahn.Connection({ + url: program.server, + realm: 'realm1', + authmethods: ['ticket', 'wampcra'], + authid: user, + tlsConfiguration: {}, + onchallenge: onchallenge +}) + +connection.onopen = function (session, details) { + session.log('Session open.', details) + + function waitForLockResource () { + session.publish( + 'myapp.resource', + [], + { pid: process.pid, value: 'handle-resource' }, + { acknowledge: true, retain: true, trace: true, when: null, will: null, watch: true } + ).then( + (result) => { + console.log('Resource Locked', result) + setTimeout( + unlockResource, + 5000 + ) + }, (reason) => { + console.log('FAILED', reason) + connection.close() + } + ) + } + + function unlockResource () { + console.log('Send Unlock by timeout') + session.publish( + 'myapp.resource', + [], + null, + { acknowledge: true, retain: true, trace: true } + ).then(() => { + setTimeout( + waitForLockResource, + 1000 + ) + }) + } + + // Define an event handler + function onEvent (publishArgs, kwargs, opts) { + console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ', kwargs) + } + + // Subscribe to a topic + session.subscribe('myapp.resource', onEvent, { retained: true }).then( + function (subscription) { + console.log('subscription successfull', subscription.topic) + }, + function (error) { + console.log('subscription failed', error) + } + ) + + waitForLockResource() +} + +connection.onclose = function (reason, details) { + console.log('disconnected', reason, details) +} + +connection.open() diff --git a/lib/context.js b/lib/context.js index f18991a..0c589d5 100644 --- a/lib/context.js +++ b/lib/context.js @@ -15,6 +15,10 @@ class Context { return this.session } + isActive () { + return this.session.isActive() + } + emit (event, message, data) { this.router.emit(event, this.session, message, data) } diff --git a/lib/memkv.js b/lib/memkv.js index 51e380b..cd98a57 100644 --- a/lib/memkv.js +++ b/lib/memkv.js @@ -3,6 +3,7 @@ const { match, extract, defaultParse, restoreUri } = require('./topic_pattern') const errorCodes = require('./realm_error').errorCodes const KeyValueStorageAbstract = require('./realm').KeyValueStorageAbstract +const isDataFit = require('./realm').isDataFit class MemKeyValueStorage extends KeyValueStorageAbstract { constructor () { @@ -22,18 +23,6 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { }) } - isDataFit (when, data) { - if (when === null && data === null) { - return true - } - - if (when !== null && data !== null) { - return true - } - - return false - } - setKeyActor (actor) { const suri = restoreUri(extract(actor.getUri(), this.getUriPattern())) @@ -45,7 +34,12 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { const findWhenActor = (curActor) => { for (let i = 0; i < resWhen.length; i++) { const whenActor = resWhen[i] - if (this.isDataFit(whenActor.getOpt().when, curActor.getData())) { + if (!whenActor.isActive()) { + resWhen.splice(i, 1) + i-- + continue + } + if (isDataFit(whenActor.getOpt().when, curActor.getData())) { resWhen.splice(i, 1) return whenActor } @@ -75,7 +69,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { } if ('when' in opt) { - if (this.isDataFit(opt.when, oldData)) { + if (isDataFit(opt.when, oldData)) { pub(actor) return } else if (opt.watch) { @@ -93,6 +87,14 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { removeSession (sessionId) { let toRemove = [] for (const [key, value] of this._keyDb) { + const resWhen = value[3] + for (let i = resWhen.length - 1; i >= 0; i--) { + const whenActor = resWhen[i] + if (whenActor.getSid() === sessionId) { + resWhen.splice(i, 1) + } + } + const keySessionId = value[0] if (keySessionId === sessionId) { toRemove.push(key) @@ -102,8 +104,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract { const key = toRemove[i] const row = this._keyDb.get(key) const will = row[2] - - if (row[2]) { + if (will) { this.setKeyData(key, will) } else { this.setKeyData(key, null) diff --git a/lib/realm.js b/lib/realm.js index 047b8bb..78e39f0 100644 --- a/lib/realm.js +++ b/lib/realm.js @@ -56,6 +56,10 @@ class Actor { // realm is not available when client already disconnected return this.ctx.session.realm } + + isActive () { + return this.ctx.isActive() + } } class ActorCall extends Actor { @@ -229,6 +233,17 @@ class ActorPush extends Actor { } } +function isDataFit (when, data) { + if (when === null && data === null) { + return true + } + + if (when !== null && data !== null) { + return true + } + return false +} + class DeferMap { constructor () { /** @@ -813,6 +828,8 @@ exports.ActorCall = ActorCall exports.ActorTrace = ActorTrace exports.ActorPush = ActorPush +exports.isDataFit = isDataFit + exports.ActorPushKv = ActorPushKv exports.KeyValueStorageAbstract = KeyValueStorageAbstract exports.BaseEngine = BaseEngine diff --git a/lib/session.js b/lib/session.js index 9d023c1..f9e13ad 100644 --- a/lib/session.js +++ b/lib/session.js @@ -14,6 +14,7 @@ function Session (gate, sessionId) { let lastPublishedId = '' let publishMap = new Map() let userDetails = {} + let active = true /** trace commands @@ -158,6 +159,11 @@ function Session (gate, sessionId) { if (this.realm) { this.realm.cleanupSession(this) } + active = false + } + + this.isActive = function () { + return active } this.getGateProtocol = function () { diff --git a/package-lock.json b/package-lock.json index 5c756f5..23c92f3 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.3", + "version": "0.7.4", "lockfileVersion": 1, "requires": true, "dependencies": { diff --git a/package.json b/package.json index f681989..4d0d5ed 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.3", + "version": "0.7.4", "description": "Web Application Message Router/Server WAMP/MQTT", "author": { "name": "Anatoly Tsapkov",