Skip to content

Commit

Permalink
sqlite history
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Mar 30, 2021
1 parent f06c0b5 commit 58f23d9
Show file tree
Hide file tree
Showing 22 changed files with 5,844 additions and 556 deletions.
32 changes: 32 additions & 0 deletions allot/one.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
'use strict'

const sqlite3 = require('sqlite3')
const sqlite = require('sqlite')
const { DbBinder } = require('../lib/sqlite/dbbinder')
const Router = require('../index')
const { BaseRealm } = require('../lib/realm')
const { ReactEngine } = require('../lib/binder')

async function main () {
const db = await sqlite.open({
filename: '../dbfiles/msgdb.sqlite',
driver: sqlite3.Database
})

const binder = new DbBinder(db)
const maxId = await binder.init()
binder.startIntervalTimer()
console.log('loaded max id:', maxId)

const router = new Router()
router.createRealm = () => new BaseRealm(router, new ReactEngine(binder))
router.setLogTrace(true)
router.listenWAMP({ port: 9000 })
router.listenMQTT({ port: 1883 })
}

main().then((value) => {
console.log('DONE.')
}, (err) => {
console.error('ERROR:', err, err.stack)
})
5 changes: 3 additions & 2 deletions lib/binder.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
'use strict'

const { AbstractBinder, BaseEngine } = require('./realm')
const { BaseEngine, DeferMap } = require('./realm')

class ReactEngine extends BaseEngine {
constructor (binder) {
super()
this.binder = binder
this.qConfirm = new DeferMap()
}

keepHistory (actor) {
Expand All @@ -21,7 +22,7 @@ class ReactEngine extends BaseEngine {
}
}

class ReactBinder extends AbstractBinder {
class ReactBinder {
// getHistoryAfter(engine, after, uri, cbRow) abstract
// cleanupSession(engine, sessionId)
}
Expand Down
20 changes: 14 additions & 6 deletions lib/fox_router.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,13 @@ const WampServer = require('./wamp/transport')
const MqttServer = require('./mqtt/transport')
const WsMqttServer = require('./mqtt/ws_transport')
const Router = require('./router')
const { MemBinder } = require('./mono/membinder')
const {BaseRealm} = require('./realm')
const {MemEngine} = require('./mono/memengine')
const {MemKeyValueStorage} = require('./mono/memkv')

class FoxRouter extends Router {
constructor (binder) {
if (!binder) {
binder = new MemBinder()
}
super(binder)
constructor () {
super()
metaUser.registerHandlers(this)
}

Expand All @@ -41,6 +40,15 @@ class FoxRouter extends Router {
}
return new WsMqttServer(gate, wsOptions)
}

createRealm () {
const engine = new MemEngine()
engine.registerKeyValueEngine(['#'], new MemKeyValueStorage())
return new BaseRealm(
this,
engine
)
}
}

module.exports = FoxRouter
17 changes: 1 addition & 16 deletions lib/mono/membinder.js → lib/mono/memengine.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
'use strict'

const { match } = require('../topic_pattern')
const { AbstractBinder, BaseEngine, BaseRealm } = require('../realm')
const { MemKeyValueStorage } = require('./memkv')
const { BaseEngine } = require('../realm')

class MemEngine extends BaseEngine {
constructor () {
Expand Down Expand Up @@ -38,18 +37,4 @@ class MemEngine extends BaseEngine {
}
}

class MemBinder extends AbstractBinder {
createRealm (router) {
const engine = new MemEngine()
engine.registerKeyValueEngine(['#'], new MemKeyValueStorage())

return new BaseRealm(
router,
engine
)
}
}

exports.MemEngine = MemEngine
exports.MemKeyValueStorage = MemKeyValueStorage
exports.MemBinder = MemBinder
4 changes: 2 additions & 2 deletions lib/mono/memkv.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { match, extract, defaultParse, restoreUri } = require('../topic_pattern')
const { match, defaultParse } = require('../topic_pattern')
const errorCodes = require('../realm_error').errorCodes
const { KeyValueStorageAbstract, isDataFit } = require('../realm')

Expand All @@ -23,7 +23,7 @@ class MemKeyValueStorage extends KeyValueStorageAbstract {
}

setKeyActor (actor) {
const suri = restoreUri(extract(actor.getUri(), this.getUriPattern()))
const suri = this.getStrUri(actor)

// let oldSid
// let oldWill
Expand Down
21 changes: 5 additions & 16 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,7 @@ class BaseRealm extends EventEmitter {
this.emit(SESSION_JOIN, session)
}

cleanupSession (session) {
leaveSession (session) {
this.emit(SESSION_LEAVE, session)
session.cleanupTrace(this.engine)
session.cleanupReg(this.engine)
Expand Down Expand Up @@ -850,23 +850,14 @@ class KeyValueStorageAbstract {
)
}

getStrUri (actor) {
return restoreUri(extract(actor.getUri(), this.getUriPattern()))
}

// Promise:getKey (uri, cbRow) ::cbRow:: aKey, data
// removeSession (sessionId)
}

class AbstractBinder extends EventEmitter {
// createRealm (router) abstract
}

class ZeroBinder extends AbstractBinder {
createRealm (router) {
return new BaseRealm(
router,
new BaseEngine()
)
}
}

exports.Actor = Actor
exports.ActorReg = ActorReg
exports.ActorCall = ActorCall
Expand All @@ -882,5 +873,3 @@ exports.BaseEngine = BaseEngine
exports.BaseRealm = BaseRealm
exports.ActorPushKv = ActorPushKv
exports.KeyValueStorageAbstract = KeyValueStorageAbstract
exports.AbstractBinder = AbstractBinder
exports.ZeroBinder = ZeroBinder
12 changes: 9 additions & 3 deletions lib/router.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ const EventEmitter = require('events').EventEmitter

const { REALM_CREATED, SESSION_TX, SESSION_RX, SESSION_WARNING } = require('./messages')
const tools = require('./tools')
const {BaseRealm, BaseEngine} = require('./realm')

class Router extends EventEmitter {
constructor (binder) {
constructor () {
super()
this._realms = new Map()
this._sessions = new Map()
this._binder = binder

this.on(SESSION_TX, function (session, data) {
this.trace('[' + session.sessionId + '] >', data)
Expand Down Expand Up @@ -68,7 +68,7 @@ class Router extends EventEmitter {
}

createRealm () {
return this._binder.createRealm(this)
return new BaseRealm(this, new BaseEngine())
}

addRealm(realmName, realm) {
Expand All @@ -77,6 +77,12 @@ class Router extends EventEmitter {
this.emit(REALM_CREATED, realm, realmName)
}

addEngine(realmName, engine) {
let realm = new BaseRealm(this, engine)
this.addRealm(realmName, realm)
return realm
}

getRealm (realmName, callback) {
if (this._realms.has(realmName)) {
callback(this._realms.get(realmName))
Expand Down
8 changes: 4 additions & 4 deletions lib/session.js
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ function Session (gate, sessionId) {
}

this.cleanup = function () {
if (this.realm && willPublishCmd && willPublishCtx) {
this.realm.doPush(willPublishCtx, willPublishCmd)
}
if (this.realm) {
this.realm.cleanupSession(this)
if (willPublishCmd && willPublishCtx) {
this.realm.doPush(willPublishCtx, willPublishCmd)
}
this.realm.leaveSession(this)
}
active = false
}
Expand Down
89 changes: 89 additions & 0 deletions lib/sqlite/dbbinder.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
'use strict'

const tools = require('../tools')
const { makeDataSerializable, unSerializeData } = require('../realm')
const { ReactBinder } = require('../binder')
const Msg = require('./msg')

class DbBinder extends ReactBinder {
constructor (db) {
super()
this.msg = new Msg(db)
this._messageGen = 0
this.dateStr = tools.keyDate(new Date())
}

startIntervalTimer () {
setInterval(() => {
let dateStr = tools.keyDate(new Date())
if (dateStr > this.dateStr) {
this._messageGen = 0
}
}, 60000)
}

async init () {
await this.msg.createTables()
let curId = await this.msg.getMaxId()
if (curId) {
this.parseId(curId)
}
return curId
}

parseId (encodedId) {
let newDateStr = encodedId.substr(0, 10)
let intLen = encodedId.charCodeAt(10) - 96
let newId = parseInt(encodedId.substr(11, intLen), 36)
if (newDateStr > this.dateStr) {
this._messageGen = newId
} else if (newDateStr == this.dateStr && newId > this._messageGen) {
this._messageGen = newId
}
}

getNewId () {
return this.dateStr + tools.keyId(++this._messageGen)
}

keepHistory (engine, actor) {
const id = this.getNewId()
actor.setEventId(id)
// console.log("getNewId", id, actor.getEvent())

let result
if (actor.getOpt().trace) {
result = this.msg.saveMsg(
id,
engine.getRealmName(),
actor.getUri(),
makeDataSerializable(actor.getData())
)
} else {
result = Promise.resolve()
}
result.then(() => {
actor.destSID = engine.dispatch(actor.getEvent())
actor.confirm(actor.msg)
})
}

getHistoryAfter (engine, after, uri, cbRow) {
return this.msg.getHistory(
engine.getRealmName(),
{ fromId: after, uri },
(event) => {
cbRow({
qid: event.id,
uri: event.uri,
data: unSerializeData(event.body)
})
}
)
}

cleanupSession(engine, sessionId) {
}
}

exports.DbBinder = DbBinder
71 changes: 71 additions & 0 deletions lib/sqlite/msg.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
'use strict'

const { defaultParse, restoreUri } = require('../topic_pattern')

function Msg (database) {
this.createTables = function () {
return database.run(
'CREATE TABLE IF NOT EXISTS msg (' +
'msg_id TEXT not null,' +
'msg_realm TEXT not null,' +
'msg_uri TEXT not null,' +
'msg_body TEXT,' +
'PRIMARY KEY (msg_id));'
)
}

this.getMaxId = function () {
return database.all(
'SELECT MAX(msg_id) max_id FROM msg', []
).then((result) => {
if (!Array.isArray(result)) {
return undefined
}
if (result.length === 0) {
return undefined
}
if (result[0].max_id) {
return result[0].max_id
} else {
return undefined
}
})
}

this.saveMsg = function (id, realm, uri, body) {
return database.run(
'insert into msg values (?,?,?,?);',
[id, realm, restoreUri(uri), JSON.stringify(body)]
)
}

this.getHistory = function (realm, range, rowcb) {
let sql = 'SELECT msg_id, msg_uri, msg_body FROM msg'
let where = []
if (range.fromId) {
where.push('msg_id > "' + range.fromId + '"')
}
if (range.toId) {
where.push('msg_id <= "' + range.toId + '"')
}

where.push('msg_realm = ?')
where.push('msg_uri = ?')
sql += ' WHERE ' + where.join(' AND ') + ' ORDER BY msg_id'

// console.log('range', [realm, restoreUri(range.uri)], sql)
return database.each(
sql,
[realm, restoreUri(range.uri)],
(err, row) => {
rowcb({
id: row.msg_id,
uri: defaultParse(row.msg_uri),
body: JSON.parse(row.msg_body)
})
}
)
}
}

module.exports = Msg
Loading

0 comments on commit 58f23d9

Please sign in to comment.