From 0f4918c83a33469d0273fbce9a3beb7bf5813ce1 Mon Sep 17 00:00:00 2001 From: Anatoly Tsapkov Date: Wed, 16 Jan 2019 23:52:18 -0500 Subject: [PATCH] optimize data field structure --- README.md | 23 ++++++++--- bin/mqtt_gate.js | 6 +-- democli/metatest.js | 90 ++++++++++++++++++++--------------------- lib/hyper/clientBase.js | 63 ++++++++++++++--------------- lib/hyper/gate.js | 4 +- package-lock.json | 28 ++++++------- package.json | 2 +- test/broker.js | 18 ++++----- test/client.js | 14 +++---- 9 files changed, 129 insertions(+), 119 deletions(-) diff --git a/README.md b/README.md index 9855ad3..960c37f 100644 --- a/README.md +++ b/README.md @@ -84,12 +84,25 @@ publish('the.key', ['args'], {kwArgs:false}, { ### Aggregate Engine for the data streams -The functionality aimed to provide rapid access to continuously changed data to the web application. -Aggregate engine provides data update propagation for the subscribed clients. -The idea is to have definitions of cross table relations and calculation rules. +What if to define table structure with aggregation functions along? + +The functionality aimed to provide rapid access to continuously changed +data to the web application. + +The idea is to have definitions of cross table relations and calculation rules in one place. +Such table scheme could easy listen to the events stream and do changes +in the related tables accordingly. + +The changes in tables could be transformed and +propagated as same events to the another aggregation tables +where it could be mixed with another sources. +Aggregate engine provides data change events for the subscribed clients. + +In general the idea looks like materialized view that is based on event stream. + ```javascript - "invoice" { + "invoice": { "type": "object", "properties": { "date": { "type": "string" }, @@ -105,7 +118,7 @@ The idea is to have definitions of cross table relations and calculation rules. } }, - "detail" { + "detail": { "type": "aggregate", "properties": { "customer": { "type": "string" }, diff --git a/bin/mqtt_gate.js b/bin/mqtt_gate.js index 08dd373..a1382f9 100644 --- a/bin/mqtt_gate.js +++ b/bin/mqtt_gate.js @@ -4,9 +4,9 @@ var program = require('commander'); program - .option('-p, --wamp ', 'WAMP Server IP port', 9000) - .option('-q, --mqtt ', 'MQTT Server IP port', 1883) - .parse(process.argv); + .option('-p, --wamp ', 'WAMP Server IP port', 9000) + .option('-q, --mqtt ', 'MQTT Server IP port', 1883) + .parse(process.argv); var app = new Router(); app.setLogTrace(true); diff --git a/democli/metatest.js b/democli/metatest.js index 0dd39b4..519c69b 100644 --- a/democli/metatest.js +++ b/democli/metatest.js @@ -15,61 +15,61 @@ var connection = new autobahn.Connection({ connection.onopen = function (session, details) { - session.log("Session open."); + session.log("Session open."); - session.subscribe('wamp.session.on_join', function (publishArgs, kwargs, opts) { - console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs); - }).then( - function(subscription) { - console.log("subscription successfull wamp.session.on_join"); - } - ); + session.subscribe('wamp.session.on_join', function (publishArgs, kwargs, opts) { + console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs); + }).then( + function(subscription) { + console.log("subscription successfull wamp.session.on_join"); + } + ); - session.subscribe('wamp.session.on_leave', function (publishArgs, kwargs, opts) { - console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs); - }).then( - function(subscription) { - console.log("subscription successfull wamp.session.on_leave"); - } - ); + session.subscribe('wamp.session.on_leave', function (publishArgs, kwargs, opts) { + console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs); + }).then( + function(subscription) { + console.log("subscription successfull wamp.session.on_leave"); + } + ); - session.call('wamp.registration.get').then( - function (result) { - session.log("registration.get =", typeof(result), result); - }, - function (error) { - console.log("Call failed:", error); - }); + session.call('wamp.registration.get').then( + function (result) { + session.log("registration.get =", typeof(result), result); + }, + function (error) { + console.log("Call failed:", error); + }); - session.call('wamp.session.count').then( - function (result) { - session.log("count =", typeof(result), result); - }, - function (error) { - console.log("Call failed:", error); - }); + session.call('wamp.session.count').then( + function (result) { + session.log("count =", typeof(result), result); + }, + function (error) { + console.log("Call failed:", error); + }); var sessions = null; session.call('wamp.session.list').then( - function (result) { - sessions = result; - session.log("list =", typeof(result), result); + function (result) { + sessions = result; + session.log("list =", typeof(result), result); - session.call('wamp.session.get', [sessions[0]]).then( - function (result) { - session.log("get =", typeof(result), result); - }, - function (error) { - console.log("Call failed:", error); - }); - }, - function (error) { - console.log("Call failed:", error); - }); -}; + session.call('wamp.session.get', [sessions[0]]).then( + function (result) { + session.log("get =", typeof(result), result); + }, + function (error) { + console.log("Call failed:", error); + }); + }, + function (error) { + console.log("Call failed:", error); + }); + }; connection.onclose = function (reason, details) { - console.log("close connection:", reason, details); + console.log("close connection:", reason, details); }; connection.open(); diff --git a/lib/hyper/clientBase.js b/lib/hyper/clientBase.js index 5024cf9..88b46eb 100644 --- a/lib/hyper/clientBase.js +++ b/lib/hyper/clientBase.js @@ -73,51 +73,27 @@ CommandBase.prototype.getCommand = function () return this.command; }; -function parseData(data) { - var result = undefined; - - if (data) { - if (data.hasOwnProperty('kv')) { - result = data.kv; - } - else if (data.hasOwnProperty('message')) { - // exception - result = data; - } - else if (data.hasOwnProperty('kwargs')) { - if (Array.isArray(data.args) && data.args.length > 0) { - result = data; - } - else { - result = data.kwargs; - } - } - } - return result; -} - // data could be array, task or request CommandBase.prototype.settle = function(client, cmd) { var mode = cmd.rsp || ''; - var data = parseData(cmd.data); switch (mode) { - case RESULT_ACK: this.resolve(data); return false; - case RESULT_OK: this.resolve(data); return true; - case RESULT_ERR: this.reject (data); return true; - case RESULT_EMIT: if (this.callback) this.callback(data); return false; + case RESULT_ACK: this.resolve(cmd.data); return false; + case RESULT_OK: this.resolve(cmd.data); return true; + case RESULT_ERR: this.reject (cmd.data); return true; + case RESULT_EMIT: if (this.callback) this.callback(cmd.data); return false; case REQUEST_TASK: case REQUEST_EVENT: if (this.callback) { var task = new Task(client.sendTaskResponse, cmd); - this.callback(data, task); + this.callback(cmd.data, task); } return false; default: - this.reject(data); + this.reject(cmd.data); return true; } }; @@ -204,7 +180,7 @@ function ClientBase() { commandId++; command.id = commandId; if (undefined !== data) { - command.data = { kv: data }; + command.data = data; } this.sender.send(command); @@ -219,7 +195,7 @@ function ClientBase() { var header = { qid: request.qid, rqt: responseMode, - data: { kv:data } + data: data }; if (request.rsp === REQUEST_EVENT) { header.ft = 'CONFIRM'; // event confirmed @@ -241,6 +217,29 @@ function ClientBase() { }; } +ClientBase.prototype.parseData = function(data) { + var result = undefined; + + if (data) { + if (data.hasOwnProperty('kv')) { + result = data.kv; + } + else if (data.hasOwnProperty('message')) { + // exception + result = data; + } + else if (data.hasOwnProperty('kwargs')) { + if (Array.isArray(data.args) && data.args.length > 0) { + result = data; + } + else { + result = data.kwargs; + } + } + } + return result; +} + exports.CommandBase = CommandBase; exports.Login = Login; exports.Echo = Echo; diff --git a/lib/hyper/gate.js b/lib/hyper/gate.js index a944e1a..c51b316 100644 --- a/lib/hyper/gate.js +++ b/lib/hyper/gate.js @@ -151,7 +151,7 @@ cmdAck.REG = function (session, cmd) { session.send({ id: cmd.id, rsp: RESULT_ACK, - data: {kv:cmd.qid} // will unregister + data: cmd.qid // will unregister }); }; @@ -181,7 +181,7 @@ cmdAck.TRACE = function (session, cmd) { session.send({ id: cmd.id, rsp: RESULT_ACK, - data: {kv:cmd.qid} // will unregister + data: cmd.qid // will unregister }); }; diff --git a/package-lock.json b/package-lock.json index a1ddf85..ffefca8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,19 +1,19 @@ { "name": "fox-wamp", - "version": "0.5.6", + "version": "0.5.7", "lockfileVersion": 1, "requires": true, "dependencies": { "assertion-error": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/assertion-error/-/assertion-error-1.1.0.tgz", - "integrity": "sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==", + "integrity": "sha1-5gtrDo8wG9l+U3UhW9pAbIURjAs=", "dev": true }, "async-limiter": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/async-limiter/-/async-limiter-1.0.0.tgz", - "integrity": "sha512-jp/uFnooOiO+L211eZOoSyzpOITMXx1rBITauYykG3BRYPu8h0UcxsPNB04RR5vo4Tyz3+ay17tR6JVf9qzYWg==" + "integrity": "sha1-ePrtjD0HSrgfIrTphdeehzj3IPg=" }, "autobahn": { "version": "18.10.2", @@ -41,7 +41,7 @@ "bl": { "version": "1.2.2", "resolved": "https://registry.npmjs.org/bl/-/bl-1.2.2.tgz", - "integrity": "sha512-e8tQYnZodmebYDWGH7KMRvtzKXaJHx3BbilrgZCfvyLUYdKpK1t5PSPmpkny/SgiTSCnjfLW7v5rlONXVFkQEA==", + "integrity": "sha1-oWCRFxcQPAdBDO9j71Gzl8Alr5w=", "requires": { "readable-stream": "^2.3.5", "safe-buffer": "^5.1.1" @@ -86,7 +86,7 @@ "chai-as-promised": { "version": "7.1.1", "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", - "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", + "integrity": "sha1-CGRdgl3rhpbuYXJdv1kMAS6wDKA=", "dev": true, "requires": { "check-error": "^1.0.2" @@ -95,7 +95,7 @@ "chai-spies": { "version": "1.0.0", "resolved": "https://registry.npmjs.org/chai-spies/-/chai-spies-1.0.0.tgz", - "integrity": "sha512-elF2ZUczBsFoP07qCfMO/zeggs8pqCf3fZGyK5+2X4AndS8jycZYID91ztD9oQ7d/0tnS963dPkd0frQEThDsg==", + "integrity": "sha1-0Ws5M2+zFtA6v4w3X+sjwMi7Fj0=", "dev": true }, "check-error": { @@ -123,7 +123,7 @@ "deep-eql": { "version": "3.0.1", "resolved": "https://registry.npmjs.org/deep-eql/-/deep-eql-3.0.1.tgz", - "integrity": "sha512-+QeIQyN5ZuO+3Uk5DYh6/1eKO0m0YmJFGNmFHGACpf1ClL1nmlV/p4gNgbl2pJGxgXb4faqo6UE+M5ACEMyVcw==", + "integrity": "sha1-38lARACtHI/gI+faHfHBR8S0RN8=", "dev": true, "requires": { "type-detect": "^4.0.0" @@ -163,7 +163,7 @@ "mqtt-packet": { "version": "6.0.0", "resolved": "https://registry.npmjs.org/mqtt-packet/-/mqtt-packet-6.0.0.tgz", - "integrity": "sha512-x5Ku6kGa1624hma/gtfm8Hh6KzZ5MrjOiA32hZWiygXb1AbRxJzbZuleW2Ps5Chc9OkiA3xQ+j0sQkIVZL6YTw==", + "integrity": "sha1-u1OrKXBWh4+OPNQGTg8LZTZGKBs=", "requires": { "bl": "^1.2.1", "inherits": "^2.0.3", @@ -222,12 +222,12 @@ "process-nextick-args": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/process-nextick-args/-/process-nextick-args-2.0.0.tgz", - "integrity": "sha512-MtEC1TqN0EU5nephaJ4rAtThHtC86dNN9qCuEhtshvpVBkAW5ZO7BASN9REnF9eoXGcRub+pFuKEpOHE+HbEMw==" + "integrity": "sha1-o31zL0JxtKsa0HDTVQjoKQeI/6o=" }, "qlobber": { "version": "3.0.2", "resolved": "https://registry.npmjs.org/qlobber/-/qlobber-3.0.2.tgz", - "integrity": "sha512-WSBRhZqAgVpdq6nypgMUWjlvfCgc6lUIEENy6dwg60tN2sJBN5vFyADGNaSujYeZOepFPB1uFzgBAlK+L0RKVA==" + "integrity": "sha1-Qygs8QQXmWBe32FNgmGRfT+uizI=" }, "randombytes": { "version": "2.0.6", @@ -241,7 +241,7 @@ "readable-stream": { "version": "2.3.6", "resolved": "https://registry.npmjs.org/readable-stream/-/readable-stream-2.3.6.tgz", - "integrity": "sha512-tQtKA9WIAhBF3+VLAseyMqZeBjW0AHJoxOtYqSUZNJxauErmLbVm2FW1y+J/YA9dUrAC39ITejlZWhVIwawkKw==", + "integrity": "sha1-sRwn2IuP8fvgcGQ8+UsMea4bCq8=", "requires": { "core-util-is": "~1.0.0", "inherits": "~2.0.3", @@ -255,12 +255,12 @@ "safe-buffer": { "version": "5.1.2", "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", - "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==" + "integrity": "sha1-mR7GnSluAxN0fVm9/St0XDX4go0=" }, "string_decoder": { "version": "1.1.1", "resolved": "https://registry.npmjs.org/string_decoder/-/string_decoder-1.1.1.tgz", - "integrity": "sha512-n/ShnvDi6FHbbVfviro+WojiFzv+s8MPMHBczVePfUpDJLwoLT0ht1l4YwBCbi8pJAveEEdnkHyPyTP/mzRfwg==", + "integrity": "sha1-nPFhG6YmhdcDCunkujQUnDrwP8g=", "requires": { "safe-buffer": "~5.1.0" } @@ -274,7 +274,7 @@ "type-detect": { "version": "4.0.8", "resolved": "https://registry.npmjs.org/type-detect/-/type-detect-4.0.8.tgz", - "integrity": "sha512-0fr/mIH1dlO+x7TlcMy+bIDqKPsw/70tVyeHW787goQjhmqaZe10uwLujubK9q9Lg6Fiho1KUKDYz0Z7k7g5/g==", + "integrity": "sha1-dkb7XxiHHPu3dJ5pvTmmOI63RQw=", "dev": true }, "utf-8-validate": { diff --git a/package.json b/package.json index a2c76f9..638cc25 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fox-wamp", - "version": "0.5.6", + "version": "0.5.7", "description": "V2 Message Router, Basic WebSocket Application Messaging Protocol (WAMP/MQTT)", "author": { "name": "Anatoly Tsapkov", diff --git a/test/broker.js b/test/broker.js index 05819a5..3b47d42 100644 --- a/test/broker.js +++ b/test/broker.js @@ -15,7 +15,6 @@ chai.use(spies); describe('hyper-broker', function() { let - broker, router, gate, sender, @@ -33,7 +32,6 @@ describe('hyper-broker', function() { afterEach(function(){ session.cleanup(); - broker = null; session = null; }); @@ -108,7 +106,7 @@ describe('hyper-broker', function() { if (msg.id == idSub) { expect(msg.id).to.equal(idSub); expect(msg.rsp).to.equal(RESULT_ACK); - regSub = msg.data.kv; + regSub = msg.data; } else { expect(msg).to.deep.equal({ @@ -140,7 +138,7 @@ describe('hyper-broker', function() { if (msg.id == idTrace) { expect(msg.rsp).to.equal(RESULT_ACK); expect(msg.id).to.equal(idTrace); - regTrace = msg.data.kv; + regTrace = msg.data; } else { expect(msg).to.deep.equal({ @@ -178,11 +176,11 @@ describe('hyper-broker', function() { }; sender.send = chai.spy((msg) => { - regTrace = msg.data.kv; + regTrace = msg.data; expect(msg).to.deep.equal({ id: idTrace, rsp: RESULT_ACK, - data: {kv:regTrace} + data: regTrace }); }); @@ -216,10 +214,10 @@ describe('hyper-broker', function() { sender.send = chai.spy((msg) => { expect(msg).to.deep.equal({ - id: idPush, - qid: regPush, - rsp: RESULT_OK, - data: 'confirm-data' + id: idPush, + qid: regPush, + rsp: RESULT_OK, + data: 'confirm-data' }); }); diff --git a/test/client.js b/test/client.js index f48a17a..b89b156 100644 --- a/test/client.js +++ b/test/client.js @@ -39,7 +39,7 @@ describe('clent', function() { expectCommand = { ft: 'ECHO', id: 1, - data: { kv: 1234 } + data: 1234 }; client.echo(1234); expect(sender.send).to.have.been.called.once(); @@ -50,7 +50,7 @@ describe('clent', function() { ft: 'CALL', uri: 'function.queue.name', id: 1, - data: { kv:{attr1:1, attr2:'value'} } + data: {attr1:1, attr2:'value'} }; client.call('function.queue.name', {attr1:1, attr2:'value'}); expect(sender.send).to.have.been.called.once(); @@ -63,7 +63,7 @@ describe('clent', function() { ack: true, opt: {some:'option'}, id: 1, - data: { kv: {attr1:1, attr2:'value'} } + data: {attr1:1, attr2:'value'} }; client.push('function.queue.name', {attr1:1, attr2:'value'}, {some:'option'}); expect(sender.send).to.have.been.called.once(); @@ -76,7 +76,7 @@ describe('clent', function() { ack: true, opt: {}, id: 1, - data: { kv:{key:'val'} } + data: {key:'val'} }; client.push('function.queue.name', {key:'val'}); expect(sender.send).to.have.been.called.once(); @@ -115,7 +115,7 @@ describe('clent', function() { ft: 'CONFIRM', rqt: RESULT_OK, qid: 'server-generated-trace-id', - data: {kv:"task-data-amended"} + data: "task-data-amended" }; // some PUBLISH occurred and data arrived @@ -124,7 +124,7 @@ describe('clent', function() { uri: 'any-text', rsp: REQUEST_EVENT, qid: 'server-generated-trace-id', - data: {kv:'task-data'} + data: 'task-data' }); // trace event invoked @@ -142,7 +142,7 @@ describe('clent', function() { ft: 'YIELD', rqt: RESULT_EMIT, qid: 'generaged.id', - data: { kv: {dataKey:'data-value'} } + data: {dataKey:'data-value'} }; var request = {}; request.id = 'no.meaning.client.task.id';