Skip to content

Commit

Permalink
refactor kv storage
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Mar 20, 2021
1 parent 1acb8ce commit f06c0b5
Show file tree
Hide file tree
Showing 9 changed files with 272 additions and 254 deletions.
6 changes: 3 additions & 3 deletions lib/binder.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ class ReactEngine extends BaseEngine {
this.binder = binder
}

doPush (actor) {
return this.binder.doPush(this, actor)
keepHistory (actor) {
return this.binder.keepHistory(this, actor)
}

getHistoryAfter (after, uri, cbRow) {
return this.binder.getHistoryAfter(this, after, uri, cbRow)
}

cleanupSession(sessionId) { // override
cleanupSession(sessionId) {
return this.binder.cleanupSession(this, sessionId)
}
}
Expand Down
75 changes: 5 additions & 70 deletions lib/mono/membinder.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { match, intersect, merge, extract } = require('../topic_pattern')
const { match } = require('../topic_pattern')
const { AbstractBinder, BaseEngine, BaseRealm } = require('../realm')
const { MemKeyValueStorage } = require('./memkv')

Expand All @@ -9,40 +9,20 @@ class MemEngine extends BaseEngine {
super()
this._messageGen = 0
this._messages = []
this._kvo = [] // key value order
}

setKeyActor (actor) {
const uri = actor.getUri()
for (let i = this._kvo.length - 1; i >= 0; i--) {
const kvr = this._kvo[i]
if (match(uri, kvr.uri)) {
kvr.kv.setKeyActor(actor)
break
}
}
}

memPush (actor) {
keepHistory (actor) {
actor.setEventId(++this._messageGen)

actor.destSID = this.dispatch(actor.getEvent())
actor.confirm(actor.msg)

if (actor.getOpt().trace) {
this._messages.push(actor.getEvent())
if (this._messages.length > 10100) {
this._messages = this._messages.splice(100)
}
}
}

doPush (actor) {
if (actor.getOpt().retain) {
this.setKeyActor(actor)
} else {
this.memPush(actor)
}
actor.destSID = this.dispatch(actor.getEvent())
actor.confirm(actor.msg)
}

getHistoryAfter (after, uri, cbRow) {
Expand All @@ -56,59 +36,14 @@ class MemEngine extends BaseEngine {
resolve()
})
}

getKey (uri, cbRow) {
const done = []
for (let i = this._kvo.length - 1; i >= 0; i--) {
const kvr = this._kvo[i]
// console.log('MATCH++', uri, kvr.uri, extract(uri, kvr.uri))
if (intersect(uri, kvr.uri)) {
done.push(kvr.kv.getKey(
extract(uri, kvr.uri),
(aKey, data) => {
cbRow(merge(aKey, kvr.uri), data)
}
))
}
}
return Promise.all(done)
}

setKeyData (uri, data) {
for (let i = this._kvo.length - 1; i >= 0; i--) {
const kvr = this._kvo[i]
if (match(uri, kvr.uri)) {
kvr.kv.setKeyData(extract(uri, kvr.uri), data)
break
}
}
}

registerKeyValueEngine (uri, kv) {
kv.setUriPattern(uri)
kv.pubActor = this.memPush.bind(this)
this._kvo.push({ uri, kv })
}

cleanupSession(sessionId) { // override
for (let i = this._kvo.length - 1; i >= 0; i--) {
this._kvo[i].kv.removeSession(sessionId)
}
}
}

class MemRealm extends BaseRealm {
registerKeyValueEngine (uri, kv) {
return this.engine.registerKeyValueEngine(uri, kv)
}
}

class MemBinder extends AbstractBinder {
createRealm (router) {
const engine = new MemEngine()
engine.registerKeyValueEngine(['#'], new MemKeyValueStorage())

return new MemRealm(
return new BaseRealm(
router,
engine
)
Expand Down
90 changes: 10 additions & 80 deletions lib/mono/memkv.js
Original file line number Diff line number Diff line change
@@ -1,76 +1,8 @@
'use strict'

const { merge, match, extract, defaultParse, restoreUri } = require('../topic_pattern')
const { match, extract, defaultParse, restoreUri } = require('../topic_pattern')
const errorCodes = require('../realm_error').errorCodes
const { isDataFit } = require('../realm')

class ActorPushKv {
constructor (uri, data, opt) {
this.uri = uri
this.data = data
this.opt = opt
this.eventId = null
}

getOpt () {
return Object.assign({}, this.opt)
}

getUri () {
return this.uri
}

getSid() {
return this.opt.sid
}

getData () {
return this.data
}

setEventId (id) {
this.eventId = id
}

getEvent () {
return {
qid: this.eventId,
uri: this.getUri(),
data: this.getData(),
opt: this.getOpt()
}
}

confirm () {}
}

class KeyValueStorageAbstract {
constructor () {
this.uriPattern = '#'
}

setUriPattern (uriPattern) {
this.uriPattern = uriPattern
}

getUriPattern () {
return this.uriPattern
}

// pubActor (actor) virtual abstract

setKeyData (key, data) {
this.setKeyActor(
new ActorPushKv(
merge(key, this.uriPattern),
data,
{})
)
}

// Promise:getKey (uri, cbRow) ::cbRow:: aKey, data
// removeSession (sessionId)
}
const { KeyValueStorageAbstract, isDataFit } = require('../realm')

class MemKeyValueStorage extends KeyValueStorageAbstract {
constructor () {
Expand Down Expand Up @@ -98,34 +30,34 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
let oldData = null
let resWhen = []

const findWhenActor = (curActor) => {
const findNextWhenActor = (curData) => {
for (let i = 0; i < resWhen.length; i++) {
const whenActor = resWhen[i]
if (!whenActor.isActive()) {
resWhen.splice(i, 1)
i--
continue
}
if (isDataFit(whenActor.getOpt().when, curActor.getData())) {
if (isDataFit(whenActor.getOpt().when, curData)) {
resWhen.splice(i, 1)
return whenActor
}
}
return false
}

const pub = (newActor) => {
const pubWhile = (newActor) => {
do {
const newOpt = newActor.getOpt()
const newData = newActor.getData()
const willSid = 'will' in newOpt ? newActor.getSid() : null
this.pubActor(newActor)
const willSid = ('will' in newOpt) ? newActor.getSid() : null
this.keepHistory(newActor)
if (newData === null) {
this._keyDb.delete(suri)
} else {
this._keyDb.set(suri, [willSid, newData, newOpt.will, resWhen])
}
newActor = findWhenActor(newActor)
newActor = findNextWhenActor(newActor.getData())
} while (newActor)
}

Expand All @@ -137,7 +69,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {

if ('when' in opt) {
if (isDataFit(opt.when, oldData)) {
pub(actor)
pubWhile(actor)
return
} else if (opt.watch) {
resWhen.push(actor)
Expand All @@ -148,7 +80,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
}
}
// no when publish
pub(actor)
pubWhile(actor)
}

removeSession (sessionId) {
Expand Down Expand Up @@ -180,6 +112,4 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
}
}

exports.KeyValueStorageAbstract = KeyValueStorageAbstract
exports.ActorPushKv = ActorPushKv
exports.MemKeyValueStorage = MemKeyValueStorage
30 changes: 14 additions & 16 deletions lib/mqtt/gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,18 @@ let handlers = {}
let cmdAck = {}

class MqttContext extends Context {
setMqttPkg (mqttPkg) {
setSubscribePkg (mqttPkg) {
this.mqttPkg = mqttPkg
}

getMqttPkg () {
getSubscribePkg () {
return this.mqttPkg
}

setMqttType (mqttType) {
this.mqttType = mqttType
}

sendEvent (cmd) {
const session = this.getSession()
if (session.getLastPublishedId() == cmd.qid) {
Expand Down Expand Up @@ -113,7 +117,7 @@ class MqttContext extends Context {
}

acknowledged (cmd) {
cmdAck[cmd.mtype].call(this, cmd)
cmdAck[this.mqttType].call(this, cmd)
}

mqttSend (msg, callback) {
Expand All @@ -130,15 +134,15 @@ class MqttContext extends Context {
}
else {
// subscribe error mode
const pkg = this.getMqttPkg()
const pkg = this.getSubscribePkg()
pkg.granted[cmd.id] = 0x80
this.mqttSubscribeDone()
}
}

mqttSubscribeDone () {
// granted is accepted QoS for the subscription, possible [0,1,2, + 128 or 0x80 for failure]
const pkg = this.getMqttPkg()
const pkg = this.getSubscribePkg()
pkg.count--
if (pkg.count === 0) {
this.mqttSend({ cmd: 'suback', messageId: pkg.id, granted: pkg.granted })
Expand Down Expand Up @@ -232,7 +236,6 @@ class MqttGate extends BaseGate {
}

const cmd = {
mtype: 'publish',
uri: mqttParse(message.topic),
data: (message.payload.length === 0
? null
Expand All @@ -253,14 +256,14 @@ class MqttGate extends BaseGate {
ctx.mqttClose(1003, 'protocol violation')
return
}
let mtype = msg.cmd
if (!handlers[mtype]) {
if (!handlers[msg.cmd]) {
this._router.emit(SESSION_WARNING, session, 'command not found', msg)
ctx.mqttClose(1003, 'protocol violation')
return
}
ctx.setMqttType(msg.cmd)
try {
handlers[mtype].call(this, ctx, session, msg)
handlers[msg.cmd].call(this, ctx, session, msg)
} catch (err) {
if (err instanceof RealmError) {
console.log(err)
Expand Down Expand Up @@ -333,7 +336,7 @@ handlers.subscribe = function (ctx, session, message) {
granted: [],
count: message.subscriptions.length
}
ctx.setMqttPkg(pkg)
ctx.setSubscribePkg(pkg)
const afterId = session.getLastPublishedId()
for (let index=0; index < message.subscriptions.length; index++) {
const qos = Math.min(message.subscriptions[index].qos, 1)
Expand All @@ -349,12 +352,7 @@ handlers.subscribe = function (ctx, session, message) {
if (message.retain) {
opt.retained = true
}
const cmd = {
mtype: 'subscribe',
id: index,
uri,
opt
}
const cmd = { id: index, uri, opt }
if (this.checkAuthorize(ctx, cmd, 'subscribe')) {
session.realm.doTrace(ctx, cmd)
}
Expand Down
Loading

0 comments on commit f06c0b5

Please sign in to comment.