Skip to content

Commit

Permalink
leaderless back online
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Dec 9, 2024
1 parent 10a7ae3 commit 079b772
Show file tree
Hide file tree
Showing 13 changed files with 94 additions and 58 deletions.
4 changes: 2 additions & 2 deletions leaderless/entry.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const conf_fox_port = process.env.FOX_PORT
|| console.log('FOX_PORT must be defined') || process.exit(1)

const conf_node_id = process.env.NODE_ID
|| console.log('ID must be defined') || process.exit(1)
|| console.log('NODE_ID must be defined') || process.exit(1)

const Router = require('../lib/router')
const { BaseRealm } = require('../lib/realm')
Expand All @@ -32,4 +32,4 @@ new WampServer(new WampGate(router), { port: conf_wamp_port })
new MqttServer(new MqttGate(router), { port: conf_mqtt_port })
new FoxNetServer(new FoxGate(router), { port: conf_fox_port })

console.log('Listening WAMP port:', conf_wamp_port)
console.log('at NODE_ID:', conf_node_id, 'listening WAMP:', conf_wamp_port, 'MQTT:', conf_mqtt_port, 'FOX:', conf_fox_port)
12 changes: 6 additions & 6 deletions leaderless/ndb.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,22 +33,22 @@ const readyQuorum = new QuorumEdge((advanceSegment, segmentId) => {

function mkSync(uri, ssId) {
console.log('connect to sync:', ssId, uri)
const connection = new autobahn.Connection({url: uri, realm: 'ctrl'})
const connection = new autobahn.Connection({url: uri, realm: 'sys'})

connection.onopen = function (session, details) {
session.log('Session open', ssId)
console.log('sync session open', ssId, uri)
syncMass.set(ssId, session)
runQuorum.addMember(ssId)
readyQuorum.addMember(ssId)

session.subscribe('draftSegmentId', (args, kwargs, opts) => {
console.log('draftSegmentId', ssId, kwargs)
console.log('=> draftSegmentId', ssId, args, kwargs)
runQuorum.vote(ssId, kwargs.applicantId, kwargs.runId)
maxId = mergeMax(maxId, kwargs.runId)
})

session.subscribe('commitSegment', (args, kwargs, opts) => {
console.log('commitSegment', ssId, kwargs)
console.log('=> commitSegment', ssId, args, kwargs)
readyQuorum.vote(ssId, kwargs.advanceSegment, kwargs.readyId)
})
}
Expand All @@ -67,7 +67,7 @@ function mkGate(uri, gateId, history, modKv, heapApi) {
const connection = new autobahn.Connection({url: uri, realm: 'sys'})

connection.onopen = function (session, details) {
console.log('Gate session open', gateId, uri)
console.log('gate session open', gateId, uri)
gateMass.set(
gateId,
new EntrySession(session, syncMass, history, gateId, (advanceSegment, segment, effectId) => {
Expand Down Expand Up @@ -131,7 +131,7 @@ async function main () {
}

main().then(() => {
console.log('DONE.')
console.log('connect function started')
}, (err) => {
console.error('ERROR:', err, err.stack)
})
38 changes: 21 additions & 17 deletions leaderless/sync.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,47 @@ const {mergeMin, keyDate, MakeId} = require('../lib/allot/makeid')

const makeId = new MakeId(() => keyDate(new Date()))
const app = new Router()
const realm = new BaseRealm(app, new BaseEngine())
app.addRealm('ctrl', realm)
const api = realm.api()
const sysRealm = new BaseRealm(app, new BaseEngine())
app.addRealm('sys', sysRealm)
const api = sysRealm.api()
/*const server = */new WampServer(new WampGate(app), { port: conf_wamp_port })
console.log('Listening WAMP port:', conf_wamp_port)

makeId.update()
setInterval(()=>{makeId.update()}, 7000)

const makeQuorum = new QuorumEdge((applicantId, value) => {
const id = makeId.makeIdRec(value)
console.log('draftSegmentId:', value, applicantId, '=>', id)
api.publish('draftSegmentId', {kv: {applicantId, runId: id}})
}, (a,b) => Math.max(a,b))
const makeQuorum = new QuorumEdge(
(applicantId, value) => {
const id = makeId.makeIdRec(value)
console.log('makeQuorum:draftSegmentId', value, applicantId, '=>', id)
api.publish('draftSegmentId', null, {headers: {applicantId, runId: id}})
},
(a,b) =>
Math.max(a,b)
)

const syncQuorum = new QuorumEdge((advanceSegment, value) => {
console.log('QSYNC!', advanceSegment, '=>', value)
api.publish(['commitSegment'], {kv: {advanceSegment, readyId: value}})
api.publish('commitSegment', null, {headers: {advanceSegment, readyId: value}})
}, mergeMin)

realm.on(MSG.SESSION_JOIN, (session) => {
sysRealm.on(MSG.SESSION_JOIN, (session) => {
makeQuorum.addMember(session.getSid())
syncQuorum.addMember(session.getSid())
})

realm.on(MSG.SESSION_LEAVE, (session) => {
sysRealm.on(MSG.SESSION_LEAVE, (session) => {
makeQuorum.delMember(session.getSid())
syncQuorum.delMember(session.getSid())
})

api.subscribe(['makeSegmentId'], (data, opt) => {
console.log('MAKE-ID', opt.sid, data.kwargs)
makeQuorum.vote(opt.sid, data.kwargs.advanceSegment, data.kwargs.step)
api.subscribe('makeSegmentId', (data, opt) => {
console.log('=> receive MAKE-ID', data, opt.headers)
makeQuorum.vote(opt.sid, opt.headers.advanceSegment, opt.headers.step)
})

api.subscribe(['syncId'], (data, opt) => {
api.subscribe('syncId', (data, opt) => {
console.log('SYNC-ID', data, opt)
makeId.reconcilePos(data.kwargs.maxId.dt, data.kwargs.maxId.id)
syncQuorum.vote(opt.sid, data.kwargs.advanceSegment, data.kwargs.syncId)
makeId.reconcilePos(opt.headers.maxId.dt, opt.headers.maxId.id)
syncQuorum.vote(opt.sid, opt.headers.advanceSegment, opt.headers.syncId)
})
16 changes: 8 additions & 8 deletions lib/allot/entry_session.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ class HistorySegment {
}

class EntrySession {
constructor (wampSession, syncMass, history, gateId, pubResult) {
constructor (wampSession, syncMass, historyDb, gateId, pubResult) {
this.wampSession = wampSession
this.stackAdvanceSegment = []
this.curAdvanceSegment = undefined
this.syncMass = syncMass
this.history = history
this.historyDb = historyDb
this.gateId = gateId
this.pubResult = pubResult
this.segmentToWrite = new Map()
this.isEventSource = false

wampSession.subscribe('beginSegment', (args, kwargs, opts) => {
console.log('beginSegment', kwargs)
this.queueSync(kwargs.advanceSegment)
wampSession.publish('beginSegmentAccepted', [], {advanceSegment: kwargs.advanceSegment})
const advanceSegment = args[0].advanceSegment
this.queueSync(advanceSegment)
wampSession.publish('beginSegmentAccepted', [{advanceSegment: advanceSegment}])
})

wampSession.subscribe('saveHistory', (args, kwargs, opts) => {
Expand All @@ -51,7 +51,7 @@ class EntrySession {
{ pid: process.pid },
{ acknowledge: true, retain: true, when: null, will: null, watch: true, exclude_me: false }
).then((result) => {
console.log('at '+this.gateId+': selected as event source', result)
console.log('GATE:'+this.gateId+': use that db as event source', result)
this.isEventSource = true
})
}
Expand All @@ -69,7 +69,7 @@ class EntrySession {
}

sendToMakeSegment (advanceSegment) {
console.log('sendToMakeSegment on SYNC advanceSegment[', advanceSegment, "]")
console.log('sendToMakeSegment on SYNC advanceSegment[', advanceSegment, '] to count:', this.syncMass.size)
for (let [,ss] of this.syncMass) {
ss.publish('makeSegmentId', [], {advanceSegment, step: 2})
}
Expand Down Expand Up @@ -134,7 +134,7 @@ class EntrySession {

for (let row of segment.content) {
let id = makeId.makeIdStr()
this.history.saveEventHistory(id, row.realm, row.uri, row.data, row.opt)
this.historyDb.saveEventHistory(id, row.realm, row.uri, row.data, row.opt)
result.push(id) // keep event position in result array
}
return result
Expand Down
33 changes: 24 additions & 9 deletions lib/allot/netbinder.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ class HistorySegment {
}
return actor
}

getAdvanceSegment() {
return this.advanceSegment
}
}

class NetEngine extends PromiseEngine {
Expand All @@ -51,16 +55,28 @@ class NetBinder extends PromiseBinder {
this.sysApi = this.sysRealm.api()

this.sysApi.subscribe(['beginSegmentAccepted'], (data, opt) => {
// it is time to create new segment if it is necessary
this.curSegment = null
if (!data.advanceSegment) {
console.error('ERROR: no advanceSegment in package')
}
if (this.curSegment) {
if (data.advanceSegment == this.curSegment.getAdvanceSegment()) {
// it is time to create new segment if it is necessary
this.curSegment = null
console.log('advance segment completed', data.advanceSegment)
} else {
console.warn('warn: new segment is not accepted, cur:', this.curSegment.getAdvanceSegment(), 'inbound:', data.advanceSegment)
}
}
})

this.sysApi.subscribe(['ackSegment'], (data, opt) => {
this.ackSegment(data.kwargs)
console.log('=> ackSegment', data, opt.headers)
this.ackSegment(opt.headers)
})

this.sysApi.subscribe(['dispatchEvent'], (data, opt) => {
this.dispatchEvent(data.kwargs, opt)
console.log('=> dispatchEvent', data, opt.headers)
this.dispatchEvent(opt.headers)
})
}

Expand All @@ -72,9 +88,9 @@ class NetBinder extends PromiseBinder {
let curAdvanceSegment = '' + this.router.getId() + '-' + this.advanceSegmentGen
this.curSegment = new HistorySegment(curAdvanceSegment)
this.segments.set(curAdvanceSegment, this.curSegment)
this.sysApi.publish(['beginSegment'], {kv:{
this.sysApi.publish('beginSegment', {
advanceSegment: curAdvanceSegment
}})
})
return this.curSegment
}

Expand Down Expand Up @@ -113,7 +129,7 @@ class NetBinder extends PromiseBinder {
let segment = this.getSegment()
let advanceId = segment.addActor(actor)

return this.sysApi.publish(['saveHistory'], {kv:{
return this.sysApi.publish('saveHistory', null, { headers:{
advanceId: advanceId,
realm: engine.getRealmName(),
data: makeDataSerializable(actor.getData()),
Expand All @@ -137,10 +153,9 @@ class NetBinder extends PromiseBinder {
)
}

dispatchEvent (eventData, opt) {
dispatchEvent (eventData) {
const realm = this.router.findRealm(eventData.realm)
if (realm) {
console.log('net-disperseToSubs', eventData.realm, eventData, opt)
realm.getEngine().disperseToSubs({
qid: eventData.qid,
uri: eventData.uri,
Expand Down
13 changes: 11 additions & 2 deletions lib/hyper/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ function localError(action, cmd) {
function localEvent(action, cmd) {
let eventOpt = {
publication: cmd.qid,
topic: restoreUri(cmd.uri)
topic: restoreUri(cmd.uri),
headers: cmd.hdr
}
action.cb(getBodyValue(cmd.data), eventOpt)
}
Expand Down Expand Up @@ -202,7 +203,7 @@ function HyperClient (realm, ctx) {
let result
let subContainer
let ack = false
if (opt.acknowledge) {
if ('acknowledge' in opt) {
ack = true
delete opt.acknowledge
subContainer = {}
Expand All @@ -213,12 +214,20 @@ function HyperClient (realm, ctx) {
} else {
result = Promise.resolve()
}
let headers
if ('headers' in opt) {
headers = opt.headers
delete opt.headers
} else {
headers = {}
}
// do not wait if no ack in request
realm.cmdPush(ctx, {
id: subContainer,
uri: defaultParse(uri),
opt,
data: parseHyperBody(data),
hdr: headers,
ack
})
return result
Expand Down
11 changes: 11 additions & 0 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Actor {
this.ctx.sendError(this.msg, errorCode, text)
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}

Expand Down Expand Up @@ -85,6 +86,7 @@ class ActorEcho extends Actor {
this.ctx.sendOkey(this.msg)
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}
}
Expand Down Expand Up @@ -136,6 +138,7 @@ class ActorCall extends Actor {
})
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}

const subD = this.getRegistration()
Expand Down Expand Up @@ -188,6 +191,7 @@ class ActorTrace extends Actor {
this.ctx.sendEvent(cmd)
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}

Expand Down Expand Up @@ -217,6 +221,7 @@ class ActorTrace extends Actor {
this.ctx.sendSubscribed(this.msg)
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}

Expand All @@ -225,6 +230,7 @@ class ActorTrace extends Actor {
this.ctx.sendEndSubscribe(this.msg)
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}
}
Expand Down Expand Up @@ -254,6 +260,7 @@ class ActorReg extends ActorTrace {
})
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}

Expand Down Expand Up @@ -282,6 +289,7 @@ class ActorReg extends ActorTrace {
this.ctx.sendRegistered(this.msg)
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}

Expand Down Expand Up @@ -313,6 +321,7 @@ class ActorPush extends Actor {
this.ctx.sendPublished({id: this.msg.id, qid: this.eventId})
} catch (e) {
this.ctx.setSendFailed(e)
throw e
}
}
}
Expand Down Expand Up @@ -778,6 +787,7 @@ class BaseRealm extends EventEmitter {
ctx.sendUnregistered(cmd)
} catch (e) {
ctx.setSendFailed(e)
throw e
}
return registration.getUri()
} else {
Expand Down Expand Up @@ -840,6 +850,7 @@ class BaseRealm extends EventEmitter {
ctx.sendUnsubscribed(cmd)
} catch (e) {
ctx.setSendFailed(e)
throw e
}
return subscription.getUri()
} else {
Expand Down
5 changes: 1 addition & 4 deletions lib/wamp/gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,8 @@ const Context = require('../context')
let handlers = {}

function toWampArgs (body) {
if ('args' in body) {
return body.args
}
let value = getBodyValue(body)
if (Array.isArray(value)) {
if (value === null || Array.isArray(value)) {
return value
}
return [value]
Expand Down
Loading

0 comments on commit 079b772

Please sign in to comment.