Skip to content

Commit

Permalink
optimize data field structure
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Jan 17, 2019
1 parent dbde90d commit 0f4918c
Show file tree
Hide file tree
Showing 9 changed files with 129 additions and 119 deletions.
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,25 @@ publish('the.key', ['args'], {kwArgs:false}, {

### Aggregate Engine for the data streams

The functionality aimed to provide rapid access to continuously changed data to the web application.
Aggregate engine provides data update propagation for the subscribed clients.
The idea is to have definitions of cross table relations and calculation rules.
What if to define table structure with aggregation functions along?

The functionality aimed to provide rapid access to continuously changed
data to the web application.

The idea is to have definitions of cross table relations and calculation rules in one place.
Such table scheme could easy listen to the events stream and do changes
in the related tables accordingly.

The changes in tables could be transformed and
propagated as same events to the another aggregation tables
where it could be mixed with another sources.
Aggregate engine provides data change events for the subscribed clients.

In general the idea looks like materialized view that is based on event stream.


```javascript
"invoice" {
"invoice": {
"type": "object",
"properties": {
"date": { "type": "string" },
Expand All @@ -105,7 +118,7 @@ The idea is to have definitions of cross table relations and calculation rules.
}
},

"detail" {
"detail": {
"type": "aggregate",
"properties": {
"customer": { "type": "string" },
Expand Down
6 changes: 3 additions & 3 deletions bin/mqtt_gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ var
program = require('commander');

program
.option('-p, --wamp <port>', 'WAMP Server IP port', 9000)
.option('-q, --mqtt <port>', 'MQTT Server IP port', 1883)
.parse(process.argv);
.option('-p, --wamp <port>', 'WAMP Server IP port', 9000)
.option('-q, --mqtt <port>', 'MQTT Server IP port', 1883)
.parse(process.argv);

var app = new Router();
app.setLogTrace(true);
Expand Down
90 changes: 45 additions & 45 deletions democli/metatest.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,61 +15,61 @@ var connection = new autobahn.Connection({

connection.onopen = function (session, details) {

session.log("Session open.");
session.log("Session open.");

session.subscribe('wamp.session.on_join', function (publishArgs, kwargs, opts) {
console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs);
}).then(
function(subscription) {
console.log("subscription successfull wamp.session.on_join");
}
);
session.subscribe('wamp.session.on_join', function (publishArgs, kwargs, opts) {
console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs);
}).then(
function(subscription) {
console.log("subscription successfull wamp.session.on_join");
}
);

session.subscribe('wamp.session.on_leave', function (publishArgs, kwargs, opts) {
console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs);
}).then(
function(subscription) {
console.log("subscription successfull wamp.session.on_leave");
}
);
session.subscribe('wamp.session.on_leave', function (publishArgs, kwargs, opts) {
console.log('Event', opts.topic, 'received args', publishArgs, 'kwargs ',kwargs);
}).then(
function(subscription) {
console.log("subscription successfull wamp.session.on_leave");
}
);

session.call('wamp.registration.get').then(
function (result) {
session.log("registration.get =", typeof(result), result);
},
function (error) {
console.log("Call failed:", error);
});
session.call('wamp.registration.get').then(
function (result) {
session.log("registration.get =", typeof(result), result);
},
function (error) {
console.log("Call failed:", error);
});

session.call('wamp.session.count').then(
function (result) {
session.log("count =", typeof(result), result);
},
function (error) {
console.log("Call failed:", error);
});
session.call('wamp.session.count').then(
function (result) {
session.log("count =", typeof(result), result);
},
function (error) {
console.log("Call failed:", error);
});

var sessions = null;
session.call('wamp.session.list').then(
function (result) {
sessions = result;
session.log("list =", typeof(result), result);
function (result) {
sessions = result;
session.log("list =", typeof(result), result);

session.call('wamp.session.get', [sessions[0]]).then(
function (result) {
session.log("get =", typeof(result), result);
},
function (error) {
console.log("Call failed:", error);
});
},
function (error) {
console.log("Call failed:", error);
});
};
session.call('wamp.session.get', [sessions[0]]).then(
function (result) {
session.log("get =", typeof(result), result);
},
function (error) {
console.log("Call failed:", error);
});
},
function (error) {
console.log("Call failed:", error);
});
};

connection.onclose = function (reason, details) {
console.log("close connection:", reason, details);
console.log("close connection:", reason, details);
};

connection.open();
63 changes: 31 additions & 32 deletions lib/hyper/clientBase.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,51 +73,27 @@ CommandBase.prototype.getCommand = function ()
return this.command;
};

function parseData(data) {
var result = undefined;

if (data) {
if (data.hasOwnProperty('kv')) {
result = data.kv;
}
else if (data.hasOwnProperty('message')) {
// exception
result = data;
}
else if (data.hasOwnProperty('kwargs')) {
if (Array.isArray(data.args) && data.args.length > 0) {
result = data;
}
else {
result = data.kwargs;
}
}
}
return result;
}

// data could be array, task or request
CommandBase.prototype.settle = function(client, cmd)
{
var mode = cmd.rsp || '';
var data = parseData(cmd.data);

switch (mode)
{
case RESULT_ACK: this.resolve(data); return false;
case RESULT_OK: this.resolve(data); return true;
case RESULT_ERR: this.reject (data); return true;
case RESULT_EMIT: if (this.callback) this.callback(data); return false;
case RESULT_ACK: this.resolve(cmd.data); return false;
case RESULT_OK: this.resolve(cmd.data); return true;
case RESULT_ERR: this.reject (cmd.data); return true;
case RESULT_EMIT: if (this.callback) this.callback(cmd.data); return false;

case REQUEST_TASK:
case REQUEST_EVENT:
if (this.callback) {
var task = new Task(client.sendTaskResponse, cmd);
this.callback(data, task);
this.callback(cmd.data, task);
}
return false;
default:
this.reject(data);
this.reject(cmd.data);
return true;
}
};
Expand Down Expand Up @@ -204,7 +180,7 @@ function ClientBase() {
commandId++;
command.id = commandId;
if (undefined !== data) {
command.data = { kv: data };
command.data = data;
}

this.sender.send(command);
Expand All @@ -219,7 +195,7 @@ function ClientBase() {
var header = {
qid: request.qid,
rqt: responseMode,
data: { kv:data }
data: data
};
if (request.rsp === REQUEST_EVENT) {
header.ft = 'CONFIRM'; // event confirmed
Expand All @@ -241,6 +217,29 @@ function ClientBase() {
};
}

ClientBase.prototype.parseData = function(data) {
var result = undefined;

if (data) {
if (data.hasOwnProperty('kv')) {
result = data.kv;
}
else if (data.hasOwnProperty('message')) {
// exception
result = data;
}
else if (data.hasOwnProperty('kwargs')) {
if (Array.isArray(data.args) && data.args.length > 0) {
result = data;
}
else {
result = data.kwargs;
}
}
}
return result;
}

exports.CommandBase = CommandBase;
exports.Login = Login;
exports.Echo = Echo;
Expand Down
4 changes: 2 additions & 2 deletions lib/hyper/gate.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ cmdAck.REG = function (session, cmd) {
session.send({
id: cmd.id,
rsp: RESULT_ACK,
data: {kv:cmd.qid} // will unregister
data: cmd.qid // will unregister
});
};

Expand Down Expand Up @@ -181,7 +181,7 @@ cmdAck.TRACE = function (session, cmd) {
session.send({
id: cmd.id,
rsp: RESULT_ACK,
data: {kv:cmd.qid} // will unregister
data: cmd.qid // will unregister
});
};

Expand Down
28 changes: 14 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 0f4918c

Please sign in to comment.