Skip to content

Commit

Permalink
resource lock demo app
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Mar 6, 2020
1 parent 95cbef4 commit ea14584
Show file tree
Hide file tree
Showing 9 changed files with 153 additions and 25 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ coverage/*
.nyc_output
.idea/
**/*.iml
dbfiles/*
17 changes: 10 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,24 +84,27 @@ register('reduce.the.key.#', (args, kwargs, options) => {
{reducer: true})
```

## Retained Storage Roadmap
It is good to have some storage to keep last published message. The server
has to maintain persistence of keys and provide the value as immediate first
message for the subscription. And here what could be implemented
## Retained Storage
There is storage to keep last published message. The server maintains persistence of keys. The values are provided as immediate first message for the subscription if `retained` flag is pecified.

```javascript
publish('the.key', [ 'args' ], { kwArgs: true }, {
session.publish('the.key', [ 'args' ], { kwArgs: true }, {
retain: true,
when: { status: 'started' },
watch: false
will: { value: 'to', publish: 'at', session: 'disconnect' }
})
```

### Options Description
### Publish Options Description
* retain: boolean, keep in Key Value storage. Default value is false that means message does not retain.
* when: struct, publish only if the key meets requirements. null means that key should not be exists.
* watch: boolean, applicable if `when` option defined. Provide ability to wait the necesssary condition and do action immediately. If several clients waits for that the only one achieves acknowledge message.
* watch: boolean, applicable if `when` option defined. Provide ability to wait for the necesssary condition and then do action immediately. If several clients waits for same value the only one achieves acknowledge of message.
* will: value that will be assigned at the session disconnect. If the value is changed by another
process the will value is cleaned.

### Subscribe Options
* retained: boolean, corresponding values from key value storage will be returned as immidiate events.

### Aggregate Engine for the data streams

Expand Down
96 changes: 96 additions & 0 deletions democli/resource-lock.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
// AUTOBAHN_DEBUG = true;
const autobahn = require('autobahn')
const program = require('commander')

program
.option('-s, --server <server>', 'Server URI address', 'ws://127.0.0.1:9000/wamp')
.parse(process.argv)

console.log('connect to server:', program.server)

var user = 'joe'
var key = 'joe-secret'

// this callback is fired during authentication
function onchallenge (session, method, extra) {
if (method === 'ticket') {
return key
}
else if (method === 'wampcra') {
return autobahn.auth_cra.sign(key, extra.challenge)
}
else {
throw Error("don't know how to authenticate using '" + method + "'")
}
}

const connection = new autobahn.Connection({
url: program.server,
realm: 'realm1',
authmethods: ['ticket', 'wampcra'],
authid: user,
tlsConfiguration: {},
onchallenge: onchallenge
})

connection.onopen = function (session, details) {
session.log('Session open.', details)

function waitForLockResource () {
session.publish(
'myapp.resource',
[],
{ pid: process.pid, value: 'handle-resource' },
{ acknowledge: true, retain: true, trace: true, when: null, will: null, watch: true }
).then(
(result) => {
console.log('Resource Locked', result)
setTimeout(
unlockResource,
5000
)
}, (reason) => {
console.log('FAILED', reason)
connection.close()
}
)
}

function unlockResource () {
console.log('Send Unlock by timeout')
session.publish(
'myapp.resource',
[],
null,
{ acknowledge: true, retain: true, trace: true }
).then(() => {
setTimeout(
waitForLockResource,
1000
)
})
}

// Define an event handler
function onEvent (publishArgs, kwargs, opts) {
console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ', kwargs)
}

// Subscribe to a topic
session.subscribe('myapp.resource', onEvent, { retained: true }).then(
function (subscription) {
console.log('subscription successfull', subscription.topic)
},
function (error) {
console.log('subscription failed', error)
}
)

waitForLockResource()
}

connection.onclose = function (reason, details) {
console.log('disconnected', reason, details)
}

connection.open()
4 changes: 4 additions & 0 deletions lib/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ class Context {
return this.session
}

isActive () {
return this.session.isActive()
}

emit (event, message, data) {
this.router.emit(event, this.session, message, data)
}
Expand Down
33 changes: 17 additions & 16 deletions lib/memkv.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
const { match, extract, defaultParse, restoreUri } = require('./topic_pattern')
const errorCodes = require('./realm_error').errorCodes
const KeyValueStorageAbstract = require('./realm').KeyValueStorageAbstract
const isDataFit = require('./realm').isDataFit

class MemKeyValueStorage extends KeyValueStorageAbstract {
constructor () {
Expand All @@ -22,18 +23,6 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
})
}

isDataFit (when, data) {
if (when === null && data === null) {
return true
}

if (when !== null && data !== null) {
return true
}

return false
}

setKeyActor (actor) {
const suri = restoreUri(extract(actor.getUri(), this.getUriPattern()))

Expand All @@ -45,7 +34,12 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
const findWhenActor = (curActor) => {
for (let i = 0; i < resWhen.length; i++) {
const whenActor = resWhen[i]
if (this.isDataFit(whenActor.getOpt().when, curActor.getData())) {
if (!whenActor.isActive()) {
resWhen.splice(i, 1)
i--
continue
}
if (isDataFit(whenActor.getOpt().when, curActor.getData())) {
resWhen.splice(i, 1)
return whenActor
}
Expand Down Expand Up @@ -75,7 +69,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
}

if ('when' in opt) {
if (this.isDataFit(opt.when, oldData)) {
if (isDataFit(opt.when, oldData)) {
pub(actor)
return
} else if (opt.watch) {
Expand All @@ -93,6 +87,14 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
removeSession (sessionId) {
let toRemove = []
for (const [key, value] of this._keyDb) {
const resWhen = value[3]
for (let i = resWhen.length - 1; i >= 0; i--) {
const whenActor = resWhen[i]
if (whenActor.getSid() === sessionId) {
resWhen.splice(i, 1)
}
}

const keySessionId = value[0]
if (keySessionId === sessionId) {
toRemove.push(key)
Expand All @@ -102,8 +104,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
const key = toRemove[i]
const row = this._keyDb.get(key)
const will = row[2]

if (row[2]) {
if (will) {
this.setKeyData(key, will)
} else {
this.setKeyData(key, null)
Expand Down
17 changes: 17 additions & 0 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ class Actor {
// realm is not available when client already disconnected
return this.ctx.session.realm
}

isActive () {
return this.ctx.isActive()
}
}

class ActorCall extends Actor {
Expand Down Expand Up @@ -229,6 +233,17 @@ class ActorPush extends Actor {
}
}

function isDataFit (when, data) {
if (when === null && data === null) {
return true
}

if (when !== null && data !== null) {
return true
}
return false
}

class DeferMap {
constructor () {
/**
Expand Down Expand Up @@ -813,6 +828,8 @@ exports.ActorCall = ActorCall
exports.ActorTrace = ActorTrace
exports.ActorPush = ActorPush

exports.isDataFit = isDataFit

exports.ActorPushKv = ActorPushKv
exports.KeyValueStorageAbstract = KeyValueStorageAbstract
exports.BaseEngine = BaseEngine
Expand Down
6 changes: 6 additions & 0 deletions lib/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ function Session (gate, sessionId) {
let lastPublishedId = ''
let publishMap = new Map()
let userDetails = {}
let active = true

/**
trace commands
Expand Down Expand Up @@ -158,6 +159,11 @@ function Session (gate, sessionId) {
if (this.realm) {
this.realm.cleanupSession(this)
}
active = false
}

this.isActive = function () {
return active
}

this.getGateProtocol = function () {
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "fox-wamp",
"version": "0.7.3",
"version": "0.7.4",
"description": "Web Application Message Router/Server WAMP/MQTT",
"author": {
"name": "Anatoly Tsapkov",
Expand Down

0 comments on commit ea14584

Please sign in to comment.