Skip to content

Commit

Permalink
watch for key value changes
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Mar 5, 2020
1 parent 934461c commit 95cbef4
Show file tree
Hide file tree
Showing 11 changed files with 291 additions and 169 deletions.
11 changes: 4 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,17 @@ message for the subscription. And here what could be implemented

```javascript
publish('the.key', [ 'args' ], { kwArgs: true }, {
retain: 100,
// weak: 'public',
retain: true,
when: { status: 'started' },
watch: false
will: { value: 'to', publish: 'at', session: 'disconnect' }
})
```

### Options Description
* retain: time in seconds to keep the message in the server memory. Zero means forever. Default value is false that means message does no retain.
* weak: The key disappears then client disconnects. (private|public) who could see the message, public by default
* when: publish only if the key meets requirements. null means that key should not be exists.
* watch: applicable if `when` option defined. Provide ability to wait the necesssary condition and do action immediately. If several clients waits for that the only one achieves acknowledge message.
* sequence: generate unique key
* retain: boolean, keep in Key Value storage. Default value is false that means message does not retain.
* when: struct, publish only if the key meets requirements. null means that key should not be exists.
* watch: boolean, applicable if `when` option defined. Provide ability to wait the necesssary condition and do action immediately. If several clients waits for that the only one achieves acknowledge message.

### Aggregate Engine for the data streams

Expand Down
9 changes: 0 additions & 9 deletions lib/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
class Context {
constructor (router, session, sender) {
this.router = router
this.id = undefined
this.session = session
this.sender = sender
}
Expand All @@ -16,14 +15,6 @@ class Context {
return this.session
}

setId (id) {
this.id = id
}

getId () {
return this.id
}

emit (event, message, data) {
this.router.emit(event, this.session, message, data)
}
Expand Down
103 changes: 65 additions & 38 deletions lib/memkv.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,65 +22,92 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
})
}

isDataFit (when, data) {
if (when === null && data === null) {
return true
}

if (when !== null && data !== null) {
return true
}

return false
}

setKeyActor (actor) {
const opt = actor.getOpt()
const suri = restoreUri(extract(actor.getUri(), this.getUriPattern()))
const data = actor.getData()

const pub = () => {
if (actor) {
this.pubActor(actor)
}
if (data === null) {
this._keyDb.delete(suri)
} else {
this._keyDb.set(suri, [actor.getSid(), data])
let oldSid = null
let oldData = null
let oldWill = null
let resWhen = []

const findWhenActor = (curActor) => {
for (let i = 0; i < resWhen.length; i++) {
const whenActor = resWhen[i]
if (this.isDataFit(whenActor.getOpt().when, curActor.getData())) {
resWhen.splice(i, 1)
return whenActor
}
}
return false
}
const row = this._keyDb.get(suri)

if ('when' in opt) {
if (opt.when === null) {

if (undefined === row) {
pub()
return
const pub = (newActor) => {
do {
const newOpt = newActor.getOpt()
const newData = newActor.getData()
const willSid = 'will' in newOpt ? newActor.getSid() : null
this.pubActor(newActor)
if (newData === null) {
this._keyDb.delete(suri)
} else {
this._keyDb.set(suri, [willSid, newData, newOpt.will, resWhen])
}
const [sid, val, when] = row
newActor = findWhenActor(newActor)
} while (newActor)
}

if (val === null) {
pub()
return
}
const opt = actor.getOpt()
const oldRow = this._keyDb.get(suri)
if (oldRow) {
[oldSid, oldData, oldWill, resWhen] = oldRow
}

actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Found value is not empty')
if ('when' in opt) {
if (this.isDataFit(opt.when, oldData)) {
pub(actor)
return
} else if (opt.watch) {
resWhen.push(actor)
return
} else {
actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'not accepted')
return
}

if (undefined !== row) {
const [sid, val, when] = row
if (val !== null) {
pub()
return
}
}
actor.reject(errorCodes.ERROR_INVALID_PAYLOAD, 'Value is empty')
return
}

pub()
// no when publish
pub(actor)
}

removeSession (sessionId) {
let toRemove = []
for (let key in this._keyDb) {
const keySessionId = this._keyDb.get(key)[0]
for (const [key, value] of this._keyDb) {
const keySessionId = value[0]
if (keySessionId === sessionId) {
toRemove.push(key)
}
}
for (let i = 0; i < toRemove.length; i++) {
this.setKeyData(toRemove[i], null)
const key = toRemove[i]
const row = this._keyDb.get(key)
const will = row[2]

if (row[2]) {
this.setKeyData(key, will)
} else {
this.setKeyData(key, null)
}
}
}
}
Expand Down
18 changes: 13 additions & 5 deletions lib/mqtt/gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ let handlers = {}
let cmdAck = {}

class MqttContext extends Context {
setMqttPkg (mqttPkg) {
this.mqttPkg = mqttPkg
}

getMqttPkg () {
return this.mqttPkg
}

sendEvent (cmd) {
const session = this.getSession()
if (session.getLastPublishedId() == cmd.qid) {
Expand Down Expand Up @@ -118,19 +126,19 @@ class MqttContext extends Context {

sendError (cmd, code, text) {
if (undefined === cmd.id) {
throw new RealmError(this.getId(), code, text)
throw new RealmError(undefined, code, text)
}
else {
// subscribe error mode
const pkg = this.getId()
const pkg = this.getMqttPkg()
pkg.granted[cmd.id] = 0x80
this.mqttSubscribeDone()
}
}

mqttSubscribeDone () {
// granted is accepted QoS for the subscription, possible [0,1,2, + 128 or 0x80 for failure]
const pkg = this.getId()
const pkg = this.getMqttPkg()
pkg.count--
if (pkg.count === 0) {
this.mqttSend({ cmd: 'suback', messageId: pkg.id, granted: pkg.granted })
Expand Down Expand Up @@ -300,7 +308,7 @@ cmdAck.publish = function (cmd) {

handlers.puback = function (ctx, session, message) {
this.checkRealm(session)
let qid = session.fetchWaitId(message.messageId)
const qid = session.fetchWaitId(message.messageId)
if (qid) {
session.realm.doConfirm(ctx, {
id: qid
Expand All @@ -325,7 +333,7 @@ handlers.subscribe = function (ctx, session, message) {
granted: [],
count: message.subscriptions.length
}
ctx.setId(pkg)
ctx.setMqttPkg(pkg)
const afterId = session.getLastPublishedId()
for (let index=0; index < message.subscriptions.length; index++) {
const qos = Math.min(message.subscriptions[index].qos, 1)
Expand Down
48 changes: 17 additions & 31 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class Actor {

getOpt () {
if (this.msg.opt !== undefined) {
return this.msg.opt
return Object.assign({}, this.msg.opt)
} else {
return {}
}
Expand Down Expand Up @@ -188,6 +188,7 @@ class ActorPush extends Actor {
constructor (ctx, msg) {
super(ctx, msg)
this.clientNotified = false
this.eventId = null
}

setEventId (eventId) {
Expand All @@ -203,7 +204,6 @@ class ActorPush extends Actor {
clone.qid = this.eventId
this.ctx.acknowledged(clone)
}
// this.realm.engine.qConf.doneDefer(this.sid, this.eventId)
}
}

Expand Down Expand Up @@ -277,7 +277,6 @@ class BaseEngine {
this.qYield = new DeferMap()

this.wTrace = new Qlobber() // [uri][subscription]
this.qConf = new DeferMap()
}

getSubStack (uri) {
Expand Down Expand Up @@ -417,30 +416,23 @@ class BaseEngine {
// By default, a Publisher of an event will not itself receive an event published,
// even when subscribed to the topic the Publisher is publishing to.
// If supported by the Broker, this behavior can be overridden via the option exclude_me set to false.
dispatch (actor) {
actor.destSID = {}

let found = this.matchTrace(actor.getUri())
dispatch (event, excludeSid) {
let destSID = {}
const found = this.matchTrace(event.uri)
for (let i = 0; i < found.length; i++) {
let subD = found[i]
actor.destSID[subD.getSid()] = true

if (actor.getSid() !== subD.getSid() ||
(!actor.getOpt().exclude_me)
) {
this.actorPush(subD, actor.getEvent())
const subD = found[i]
if (excludeSid !== subD.getSid()) {
destSID[subD.getSid()] = true
this.actorPush(subD, event)
}
}
this.actorConfirm(actor, actor.msg)
return destSID
}

doPush (actor) {
this.qConf.addDefer(actor, actor.eventId)
this.dispatch(actor)
}

doConfirm (actor, cmd) {
this.actorConfirm(actor, cmd)
const excludeSid = actor.getOpt().exclude_me ? actor.getSid() : 0
actor.destSID = this.dispatch(actor.getEvent(), excludeSid)
this.actorConfirm(actor, actor.msg)
}

getHistoryAfter (after, uri, cbRow) { return new Promise(() => {}) }
Expand Down Expand Up @@ -536,13 +528,7 @@ class BaseRealm extends EventEmitter {
}
}

doConfirm (ctx, cmd) {
const session = ctx.getSession()
const defer = this.engine.qConf.getDefer(session.sessionId, cmd.qid)
if (defer) {
this.engine.doConfirm(defer, cmd)
}
}
doConfirm (ctx, cmd) {}

// declare wamp.topic.history.after (uri, arg)
// last limit|integer
Expand Down Expand Up @@ -726,7 +712,7 @@ class ActorPushKv {
}

getOpt () {
return this.opt
return Object.assign({}, this.opt)
}

getUri () {
Expand Down Expand Up @@ -792,9 +778,9 @@ class MemEngine extends BaseEngine {
this._messages = []
}

dispatch (actor) {
dispatch (event, excludeSid) {
// event.qid = ++this._messageGen
return super.dispatch(actor)
return super.dispatch(event, excludeSid)
}

doPush (actor) {
Expand Down
22 changes: 20 additions & 2 deletions lib/wamp/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ class WampApiContext extends Context {
}

acknowledged (cmd) {
// console.log('ACK message not handled', cmd)
if (cmd.id && cmd.id.resolve) {
cmd.id.resolve(cmd.qid)
}
Expand Down Expand Up @@ -109,7 +108,26 @@ function WampApi (realm) {
if (opt.exclude_me !== false) {
opt.exclude_me = true
}
return realm.doPush(ctx, { uri: wampParse(uri), opt, data: { args, kwargs } })
let result
let subContainer
let ack
if (opt.acknowledge) {
ack = true
delete opt.acknowledge
subContainer = {}
result = new Promise((resolve, reject) => {
subContainer.resolve = resolve
subContainer.reject = reject
})
}
realm.doPush(ctx, {
id: subContainer,
uri: wampParse(uri),
opt,
data: { args, kwargs },
ack
})
return result
}

// gate override/internal part
Expand Down
Loading

0 comments on commit 95cbef4

Please sign in to comment.