From 079b7721b57f1b1fc1b34b6fe4540b704e059ac1 Mon Sep 17 00:00:00 2001 From: Anatoly Tsapkov Date: Sun, 8 Dec 2024 23:13:28 -0500 Subject: [PATCH] leaderless back online --- leaderless/entry.js | 4 +-- leaderless/ndb.js | 12 ++++---- leaderless/sync.js | 38 +++++++++++++----------- lib/allot/entry_session.js | 16 +++++----- lib/allot/netbinder.js | 33 ++++++++++++++------ lib/hyper/client.js | 13 ++++++-- lib/realm.js | 11 +++++++ lib/wamp/gate.js | 5 +--- package-lock.json | 4 +-- package.json | 2 +- supervisor/leaderless.supervisord.ini | 6 ++-- supervisor/resource-lock.supervisord.ini | 6 ++-- test/10.client.js | 2 +- 13 files changed, 94 insertions(+), 58 deletions(-) diff --git a/leaderless/entry.js b/leaderless/entry.js index f06eb09..2e7cd44 100644 --- a/leaderless/entry.js +++ b/leaderless/entry.js @@ -10,7 +10,7 @@ const conf_fox_port = process.env.FOX_PORT || console.log('FOX_PORT must be defined') || process.exit(1) const conf_node_id = process.env.NODE_ID - || console.log('ID must be defined') || process.exit(1) + || console.log('NODE_ID must be defined') || process.exit(1) const Router = require('../lib/router') const { BaseRealm } = require('../lib/realm') @@ -32,4 +32,4 @@ new WampServer(new WampGate(router), { port: conf_wamp_port }) new MqttServer(new MqttGate(router), { port: conf_mqtt_port }) new FoxNetServer(new FoxGate(router), { port: conf_fox_port }) -console.log('Listening WAMP port:', conf_wamp_port) +console.log('at NODE_ID:', conf_node_id, 'listening WAMP:', conf_wamp_port, 'MQTT:', conf_mqtt_port, 'FOX:', conf_fox_port) diff --git a/leaderless/ndb.js b/leaderless/ndb.js index 3f3a944..d14fef7 100644 --- a/leaderless/ndb.js +++ b/leaderless/ndb.js @@ -33,22 +33,22 @@ const readyQuorum = new QuorumEdge((advanceSegment, segmentId) => { function mkSync(uri, ssId) { console.log('connect to sync:', ssId, uri) - const connection = new autobahn.Connection({url: uri, realm: 'ctrl'}) + const connection = new autobahn.Connection({url: uri, realm: 'sys'}) connection.onopen = function (session, details) { - session.log('Session open', ssId) + console.log('sync session open', ssId, uri) syncMass.set(ssId, session) runQuorum.addMember(ssId) readyQuorum.addMember(ssId) session.subscribe('draftSegmentId', (args, kwargs, opts) => { - console.log('draftSegmentId', ssId, kwargs) + console.log('=> draftSegmentId', ssId, args, kwargs) runQuorum.vote(ssId, kwargs.applicantId, kwargs.runId) maxId = mergeMax(maxId, kwargs.runId) }) session.subscribe('commitSegment', (args, kwargs, opts) => { - console.log('commitSegment', ssId, kwargs) + console.log('=> commitSegment', ssId, args, kwargs) readyQuorum.vote(ssId, kwargs.advanceSegment, kwargs.readyId) }) } @@ -67,7 +67,7 @@ function mkGate(uri, gateId, history, modKv, heapApi) { const connection = new autobahn.Connection({url: uri, realm: 'sys'}) connection.onopen = function (session, details) { - console.log('Gate session open', gateId, uri) + console.log('gate session open', gateId, uri) gateMass.set( gateId, new EntrySession(session, syncMass, history, gateId, (advanceSegment, segment, effectId) => { @@ -131,7 +131,7 @@ async function main () { } main().then(() => { - console.log('DONE.') + console.log('connect function started') }, (err) => { console.error('ERROR:', err, err.stack) }) diff --git a/leaderless/sync.js b/leaderless/sync.js index d9b2a58..9c3e4e6 100644 --- a/leaderless/sync.js +++ b/leaderless/sync.js @@ -13,43 +13,47 @@ const {mergeMin, keyDate, MakeId} = require('../lib/allot/makeid') const makeId = new MakeId(() => keyDate(new Date())) const app = new Router() -const realm = new BaseRealm(app, new BaseEngine()) -app.addRealm('ctrl', realm) -const api = realm.api() +const sysRealm = new BaseRealm(app, new BaseEngine()) +app.addRealm('sys', sysRealm) +const api = sysRealm.api() /*const server = */new WampServer(new WampGate(app), { port: conf_wamp_port }) console.log('Listening WAMP port:', conf_wamp_port) makeId.update() setInterval(()=>{makeId.update()}, 7000) -const makeQuorum = new QuorumEdge((applicantId, value) => { - const id = makeId.makeIdRec(value) - console.log('draftSegmentId:', value, applicantId, '=>', id) - api.publish('draftSegmentId', {kv: {applicantId, runId: id}}) -}, (a,b) => Math.max(a,b)) +const makeQuorum = new QuorumEdge( + (applicantId, value) => { + const id = makeId.makeIdRec(value) + console.log('makeQuorum:draftSegmentId', value, applicantId, '=>', id) + api.publish('draftSegmentId', null, {headers: {applicantId, runId: id}}) + }, + (a,b) => + Math.max(a,b) +) const syncQuorum = new QuorumEdge((advanceSegment, value) => { console.log('QSYNC!', advanceSegment, '=>', value) - api.publish(['commitSegment'], {kv: {advanceSegment, readyId: value}}) + api.publish('commitSegment', null, {headers: {advanceSegment, readyId: value}}) }, mergeMin) -realm.on(MSG.SESSION_JOIN, (session) => { +sysRealm.on(MSG.SESSION_JOIN, (session) => { makeQuorum.addMember(session.getSid()) syncQuorum.addMember(session.getSid()) }) -realm.on(MSG.SESSION_LEAVE, (session) => { +sysRealm.on(MSG.SESSION_LEAVE, (session) => { makeQuorum.delMember(session.getSid()) syncQuorum.delMember(session.getSid()) }) -api.subscribe(['makeSegmentId'], (data, opt) => { - console.log('MAKE-ID', opt.sid, data.kwargs) - makeQuorum.vote(opt.sid, data.kwargs.advanceSegment, data.kwargs.step) +api.subscribe('makeSegmentId', (data, opt) => { + console.log('=> receive MAKE-ID', data, opt.headers) + makeQuorum.vote(opt.sid, opt.headers.advanceSegment, opt.headers.step) }) -api.subscribe(['syncId'], (data, opt) => { +api.subscribe('syncId', (data, opt) => { console.log('SYNC-ID', data, opt) - makeId.reconcilePos(data.kwargs.maxId.dt, data.kwargs.maxId.id) - syncQuorum.vote(opt.sid, data.kwargs.advanceSegment, data.kwargs.syncId) + makeId.reconcilePos(opt.headers.maxId.dt, opt.headers.maxId.id) + syncQuorum.vote(opt.sid, opt.headers.advanceSegment, opt.headers.syncId) }) diff --git a/lib/allot/entry_session.js b/lib/allot/entry_session.js index 559ab4e..8b5e0e2 100644 --- a/lib/allot/entry_session.js +++ b/lib/allot/entry_session.js @@ -17,21 +17,21 @@ class HistorySegment { } class EntrySession { - constructor (wampSession, syncMass, history, gateId, pubResult) { + constructor (wampSession, syncMass, historyDb, gateId, pubResult) { this.wampSession = wampSession this.stackAdvanceSegment = [] this.curAdvanceSegment = undefined this.syncMass = syncMass - this.history = history + this.historyDb = historyDb this.gateId = gateId this.pubResult = pubResult this.segmentToWrite = new Map() this.isEventSource = false wampSession.subscribe('beginSegment', (args, kwargs, opts) => { - console.log('beginSegment', kwargs) - this.queueSync(kwargs.advanceSegment) - wampSession.publish('beginSegmentAccepted', [], {advanceSegment: kwargs.advanceSegment}) + const advanceSegment = args[0].advanceSegment + this.queueSync(advanceSegment) + wampSession.publish('beginSegmentAccepted', [{advanceSegment: advanceSegment}]) }) wampSession.subscribe('saveHistory', (args, kwargs, opts) => { @@ -51,7 +51,7 @@ class EntrySession { { pid: process.pid }, { acknowledge: true, retain: true, when: null, will: null, watch: true, exclude_me: false } ).then((result) => { - console.log('at '+this.gateId+': selected as event source', result) + console.log('GATE:'+this.gateId+': use that db as event source', result) this.isEventSource = true }) } @@ -69,7 +69,7 @@ class EntrySession { } sendToMakeSegment (advanceSegment) { - console.log('sendToMakeSegment on SYNC advanceSegment[', advanceSegment, "]") + console.log('sendToMakeSegment on SYNC advanceSegment[', advanceSegment, '] to count:', this.syncMass.size) for (let [,ss] of this.syncMass) { ss.publish('makeSegmentId', [], {advanceSegment, step: 2}) } @@ -134,7 +134,7 @@ class EntrySession { for (let row of segment.content) { let id = makeId.makeIdStr() - this.history.saveEventHistory(id, row.realm, row.uri, row.data, row.opt) + this.historyDb.saveEventHistory(id, row.realm, row.uri, row.data, row.opt) result.push(id) // keep event position in result array } return result diff --git a/lib/allot/netbinder.js b/lib/allot/netbinder.js index 5157b0c..0e01cd0 100644 --- a/lib/allot/netbinder.js +++ b/lib/allot/netbinder.js @@ -29,6 +29,10 @@ class HistorySegment { } return actor } + + getAdvanceSegment() { + return this.advanceSegment + } } class NetEngine extends PromiseEngine { @@ -51,16 +55,28 @@ class NetBinder extends PromiseBinder { this.sysApi = this.sysRealm.api() this.sysApi.subscribe(['beginSegmentAccepted'], (data, opt) => { - // it is time to create new segment if it is necessary - this.curSegment = null + if (!data.advanceSegment) { + console.error('ERROR: no advanceSegment in package') + } + if (this.curSegment) { + if (data.advanceSegment == this.curSegment.getAdvanceSegment()) { + // it is time to create new segment if it is necessary + this.curSegment = null + console.log('advance segment completed', data.advanceSegment) + } else { + console.warn('warn: new segment is not accepted, cur:', this.curSegment.getAdvanceSegment(), 'inbound:', data.advanceSegment) + } + } }) this.sysApi.subscribe(['ackSegment'], (data, opt) => { - this.ackSegment(data.kwargs) + console.log('=> ackSegment', data, opt.headers) + this.ackSegment(opt.headers) }) this.sysApi.subscribe(['dispatchEvent'], (data, opt) => { - this.dispatchEvent(data.kwargs, opt) + console.log('=> dispatchEvent', data, opt.headers) + this.dispatchEvent(opt.headers) }) } @@ -72,9 +88,9 @@ class NetBinder extends PromiseBinder { let curAdvanceSegment = '' + this.router.getId() + '-' + this.advanceSegmentGen this.curSegment = new HistorySegment(curAdvanceSegment) this.segments.set(curAdvanceSegment, this.curSegment) - this.sysApi.publish(['beginSegment'], {kv:{ + this.sysApi.publish('beginSegment', { advanceSegment: curAdvanceSegment - }}) + }) return this.curSegment } @@ -113,7 +129,7 @@ class NetBinder extends PromiseBinder { let segment = this.getSegment() let advanceId = segment.addActor(actor) - return this.sysApi.publish(['saveHistory'], {kv:{ + return this.sysApi.publish('saveHistory', null, { headers:{ advanceId: advanceId, realm: engine.getRealmName(), data: makeDataSerializable(actor.getData()), @@ -137,10 +153,9 @@ class NetBinder extends PromiseBinder { ) } - dispatchEvent (eventData, opt) { + dispatchEvent (eventData) { const realm = this.router.findRealm(eventData.realm) if (realm) { - console.log('net-disperseToSubs', eventData.realm, eventData, opt) realm.getEngine().disperseToSubs({ qid: eventData.qid, uri: eventData.uri, diff --git a/lib/hyper/client.js b/lib/hyper/client.js index 90deefc..4e5bca0 100644 --- a/lib/hyper/client.js +++ b/lib/hyper/client.js @@ -24,7 +24,8 @@ function localError(action, cmd) { function localEvent(action, cmd) { let eventOpt = { publication: cmd.qid, - topic: restoreUri(cmd.uri) + topic: restoreUri(cmd.uri), + headers: cmd.hdr } action.cb(getBodyValue(cmd.data), eventOpt) } @@ -202,7 +203,7 @@ function HyperClient (realm, ctx) { let result let subContainer let ack = false - if (opt.acknowledge) { + if ('acknowledge' in opt) { ack = true delete opt.acknowledge subContainer = {} @@ -213,12 +214,20 @@ function HyperClient (realm, ctx) { } else { result = Promise.resolve() } + let headers + if ('headers' in opt) { + headers = opt.headers + delete opt.headers + } else { + headers = {} + } // do not wait if no ack in request realm.cmdPush(ctx, { id: subContainer, uri: defaultParse(uri), opt, data: parseHyperBody(data), + hdr: headers, ack }) return result diff --git a/lib/realm.js b/lib/realm.js index 3aa52cd..71d9a76 100644 --- a/lib/realm.js +++ b/lib/realm.js @@ -47,6 +47,7 @@ class Actor { this.ctx.sendError(this.msg, errorCode, text) } catch (e) { this.ctx.setSendFailed(e) + throw e } } @@ -85,6 +86,7 @@ class ActorEcho extends Actor { this.ctx.sendOkey(this.msg) } catch (e) { this.ctx.setSendFailed(e) + throw e } } } @@ -136,6 +138,7 @@ class ActorCall extends Actor { }) } catch (e) { this.ctx.setSendFailed(e) + throw e } const subD = this.getRegistration() @@ -188,6 +191,7 @@ class ActorTrace extends Actor { this.ctx.sendEvent(cmd) } catch (e) { this.ctx.setSendFailed(e) + throw e } } @@ -217,6 +221,7 @@ class ActorTrace extends Actor { this.ctx.sendSubscribed(this.msg) } catch (e) { this.ctx.setSendFailed(e) + throw e } } @@ -225,6 +230,7 @@ class ActorTrace extends Actor { this.ctx.sendEndSubscribe(this.msg) } catch (e) { this.ctx.setSendFailed(e) + throw e } } } @@ -254,6 +260,7 @@ class ActorReg extends ActorTrace { }) } catch (e) { this.ctx.setSendFailed(e) + throw e } } @@ -282,6 +289,7 @@ class ActorReg extends ActorTrace { this.ctx.sendRegistered(this.msg) } catch (e) { this.ctx.setSendFailed(e) + throw e } } @@ -313,6 +321,7 @@ class ActorPush extends Actor { this.ctx.sendPublished({id: this.msg.id, qid: this.eventId}) } catch (e) { this.ctx.setSendFailed(e) + throw e } } } @@ -778,6 +787,7 @@ class BaseRealm extends EventEmitter { ctx.sendUnregistered(cmd) } catch (e) { ctx.setSendFailed(e) + throw e } return registration.getUri() } else { @@ -840,6 +850,7 @@ class BaseRealm extends EventEmitter { ctx.sendUnsubscribed(cmd) } catch (e) { ctx.setSendFailed(e) + throw e } return subscription.getUri() } else { diff --git a/lib/wamp/gate.js b/lib/wamp/gate.js index 1b66241..a3dfe72 100644 --- a/lib/wamp/gate.js +++ b/lib/wamp/gate.js @@ -11,11 +11,8 @@ const Context = require('../context') let handlers = {} function toWampArgs (body) { - if ('args' in body) { - return body.args - } let value = getBodyValue(body) - if (Array.isArray(value)) { + if (value === null || Array.isArray(value)) { return value } return [value] diff --git a/package-lock.json b/package-lock.json index d6014d4..394c15d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "fox-wamp", - "version": "0.7.11", + "version": "0.7.20", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "fox-wamp", - "version": "0.7.11", + "version": "0.7.20", "license": "MIT", "dependencies": { "commander": "^2.20.3", diff --git a/package.json b/package.json index 0e8f338..ce5d4f3 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.7.11", + "version": "0.7.20", "description": "Web Application Message Router/Server WAMP/MQTT", "author": { "name": "Anatoly Tsapkov", diff --git a/supervisor/leaderless.supervisord.ini b/supervisor/leaderless.supervisord.ini index b321a3e..906b0ea 100644 --- a/supervisor/leaderless.supervisord.ini +++ b/supervisor/leaderless.supervisord.ini @@ -33,7 +33,7 @@ priority=1 [program:entry-1] command=./prefix-log.sh node ../leaderless/entry.js -environment=WAMP_PORT=9031,MQTT_PORT=1881,ID=E1 +environment=FOX_PORT=1735,WAMP_PORT=9031,MQTT_PORT=1881,NODE_ID=E1 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr @@ -42,7 +42,7 @@ priority=2 [program:entry-2] command=./prefix-log.sh node ../leaderless/entry.js -environment=WAMP_PORT=9032,MQTT_PORT=1882,ID=E2 +environment=FOX_PORT=1736,WAMP_PORT=9032,MQTT_PORT=1882,NODE_ID=E2 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr @@ -51,7 +51,7 @@ priority=2 [program:entry-3] command=./prefix-log.sh node ../leaderless/entry.js -environment=WAMP_PORT=9033,MQTT_PORT=1883,ID=E3 +environment=FOX_PORT=1737,WAMP_PORT=9033,MQTT_PORT=1883,NODE_ID=E3 stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 stderr_logfile=/dev/stderr diff --git a/supervisor/resource-lock.supervisord.ini b/supervisor/resource-lock.supervisord.ini index d79c961..24ecb1e 100644 --- a/supervisor/resource-lock.supervisord.ini +++ b/supervisor/resource-lock.supervisord.ini @@ -12,7 +12,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 priority=1 -[program:resource1] +[program:worker1] command=./prefix-log.sh node ../democli/resource-lock.js stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 @@ -20,7 +20,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 priority=2 -[program:resource2] +[program:worker2] command=./prefix-log.sh node ../democli/resource-lock.js stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 @@ -28,7 +28,7 @@ stderr_logfile=/dev/stderr stderr_logfile_maxbytes=0 priority=2 -[program:resource3] +[program:worker3] command=./prefix-log.sh node ../democli/resource-lock.js stdout_logfile=/dev/stdout stdout_logfile_maxbytes=0 diff --git a/test/10.client.js b/test/10.client.js index 7dee134..dbc2d92 100644 --- a/test/10.client.js +++ b/test/10.client.js @@ -87,7 +87,7 @@ describe('10 clent', function () { // TODO: where is publication-id in opt? expect(result.shift()).to.deep.equal([ 'event-pkg', - {topic: 'queue.name', publication: 1234567} + {topic: 'queue.name', publication: 1234567, headers: undefined} ]) })