From 5750bc312c26eee9f9aa4c2a0ad2cd6b66904771 Mon Sep 17 00:00:00 2001 From: Anatoly Tsapkov Date: Thu, 26 Mar 2020 00:48:26 -0400 Subject: [PATCH] filter to subscription --- .eslintrc.json | 1 + README.md | 12 ++++++++++ lib/realm.js | 55 ++++++++++++++++++++++++++++++++++++++++----- test/01.data_fit.js | 25 +++++++++++++++++++++ 4 files changed, 88 insertions(+), 5 deletions(-) create mode 100644 test/01.data_fit.js diff --git a/.eslintrc.json b/.eslintrc.json index 3c51b07..222d21f 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -13,6 +13,7 @@ }, "rules": { "mocha/no-exclusive-tests": "error", + "no-unused-vars": ["error", { "args": "none" }], "no-console": "off", "indent": [ "error", diff --git a/README.md b/README.md index 1f876bd..db9065f 100644 --- a/README.md +++ b/README.md @@ -108,6 +108,17 @@ The options above provide ability to use the server as Synchronization Service. is designed to delay acknowledge response of publish due to necessary conditions achieved that is described in `when` option. See the demo in `democli\resource-lock.js` +## Event Filter, coming soon +Subscribtion able to filter messages before firing out to reduce network consumption. + +```javascript +register('some.key.#', (args, kwargs) => { + // do some action here + }, + { filter: { type: 'post' } } +) +``` + ## Map-Reduce, coming soon Map-Reduce processing in terms of message queue is tranforming of the input stream to be passed to the corresponding event topic and reduced there. @@ -125,6 +136,7 @@ register('reduce.the.key.#', (args, kwargs, options) => { ### Subscribe Options * retained: boolean, corresponding values from key value storage will be returned as immidiate events. * reducer: +* filter: condition to filter messages that accepted by the subscription ### Aggregate Engine for the data streams diff --git a/lib/realm.js b/lib/realm.js index 04d9112..af1af51 100644 --- a/lib/realm.js +++ b/lib/realm.js @@ -123,6 +123,13 @@ class ActorTrace extends Actor { this.delayStack = [] } + filter (event) { + if (this.msg.opt.filter) { + return isDataFit(this.msg.opt.filter, event.data) + } + return true + } + sendEvent (cmd) { cmd.id = this.msg.id cmd.traceId = this.msg.qid @@ -233,17 +240,52 @@ class ActorPush extends Actor { } } -function isDataFit (when, data) { - if (when === null && data === null) { - return true +function getValue (data) { + if (data === null) { + return null + } + if (typeof data === 'object') { + if ('kv' in data) return data.kv + if ('payload' in data) return JSON.parse(data.payload) + if ('args' in data) { + /// && Array.isArray(data.args) + return data.kwargs + } } + throw new Error('unknown data `' + JSON.stringify(data) + '`') +} - if (when !== null && data !== null) { +function compareData (when, value) { + if (when === null && value === null) { return true } + + if (when !== null && value !== null) { + if (typeof when === 'object') { + for (const name in when) { + if (!(name in value) || !compareData(when[name], value[name])) { + return false + } + } + return true + } + return when == value + } return false } +function isDataFit (when, data) { + return compareData(when, getValue(data)) +} + +function makeDataSerializable (body) { + return (body && ('payload' in body) ? { p64: body.payload.toString('base64') } : body) +} + +function unSerializeData (body) { + return ('p64' in body ? { payload: Buffer.from(body.p64, 'base64') } : body) +} + class DeferMap { constructor () { /** @@ -436,7 +478,7 @@ class BaseEngine { const found = this.matchTrace(event.uri) for (let i = 0; i < found.length; i++) { const subD = found[i] - if (excludeSid !== subD.getSid()) { + if (excludeSid !== subD.getSid() && subD.filter(event)) { destSID[subD.getSid()] = true this.actorPush(subD, event) } @@ -832,6 +874,9 @@ exports.ActorPushKv = ActorPushKv exports.KeyValueStorageAbstract = KeyValueStorageAbstract exports.isDataFit = isDataFit +exports.makeDataSerializable = makeDataSerializable +exports.unSerializeData = unSerializeData + exports.DeferMap = DeferMap exports.BaseEngine = BaseEngine exports.MemEngine = MemEngine diff --git a/test/01.data_fit.js b/test/01.data_fit.js new file mode 100644 index 0000000..0abf3fc --- /dev/null +++ b/test/01.data_fit.js @@ -0,0 +1,25 @@ +'use strict' + +const chai = require('chai') +const expect = chai.expect + +const isDataFit = require('../lib/realm').isDataFit + +describe('01. isDataFit', function () { + it('level-one-null', function () { + expect(isDataFit(null, null)).to.equal(true) + }) + + it('level-one-data-identical', function () { + expect(isDataFit(1, { kv: 1 })).to.equal(true) + }) + it('level-one-data-failed', function () { + expect(isDataFit(1, { kv: 2 })).to.equal(false) + }) + it('level-two-identical', function () { + expect(isDataFit({ field1: 1 }, { kv: { field1: '1', field2: 2 } })).to.equal(true) + }) + it('level-two-failed', function () { + expect(isDataFit({ field: 1 }, { kv: { field: '2' } })).to.equal(false) + }) +})