Skip to content

Commit

Permalink
filter to subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
kalmyk committed Mar 26, 2020
1 parent 2eb42d0 commit 5750bc3
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 5 deletions.
1 change: 1 addition & 0 deletions .eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
},
"rules": {
"mocha/no-exclusive-tests": "error",
"no-unused-vars": ["error", { "args": "none" }],
"no-console": "off",
"indent": [
"error",
Expand Down
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ The options above provide ability to use the server as Synchronization Service.
is designed to delay acknowledge response of publish due to necessary conditions achieved that
is described in `when` option. See the demo in `democli\resource-lock.js`

## Event Filter, coming soon
Subscribtion able to filter messages before firing out to reduce network consumption.

```javascript
register('some.key.#', (args, kwargs) => {
// do some action here
},
{ filter: { type: 'post' } }
)
```

## Map-Reduce, coming soon
Map-Reduce processing in terms of message queue is tranforming of the input stream
to be passed to the corresponding event topic and reduced there.
Expand All @@ -125,6 +136,7 @@ register('reduce.the.key.#', (args, kwargs, options) => {
### Subscribe Options
* retained: boolean, corresponding values from key value storage will be returned as immidiate events.
* reducer:
* filter: condition to filter messages that accepted by the subscription

### Aggregate Engine for the data streams

Expand Down
55 changes: 50 additions & 5 deletions lib/realm.js
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ class ActorTrace extends Actor {
this.delayStack = []
}

filter (event) {
if (this.msg.opt.filter) {
return isDataFit(this.msg.opt.filter, event.data)
}
return true
}

sendEvent (cmd) {
cmd.id = this.msg.id
cmd.traceId = this.msg.qid
Expand Down Expand Up @@ -233,17 +240,52 @@ class ActorPush extends Actor {
}
}

function isDataFit (when, data) {
if (when === null && data === null) {
return true
function getValue (data) {
if (data === null) {
return null
}
if (typeof data === 'object') {
if ('kv' in data) return data.kv
if ('payload' in data) return JSON.parse(data.payload)
if ('args' in data) {
/// && Array.isArray(data.args)
return data.kwargs
}
}
throw new Error('unknown data `' + JSON.stringify(data) + '`')
}

if (when !== null && data !== null) {
function compareData (when, value) {
if (when === null && value === null) {
return true
}

if (when !== null && value !== null) {
if (typeof when === 'object') {
for (const name in when) {
if (!(name in value) || !compareData(when[name], value[name])) {
return false
}
}
return true
}
return when == value
}
return false
}

function isDataFit (when, data) {
return compareData(when, getValue(data))
}

function makeDataSerializable (body) {
return (body && ('payload' in body) ? { p64: body.payload.toString('base64') } : body)
}

function unSerializeData (body) {
return ('p64' in body ? { payload: Buffer.from(body.p64, 'base64') } : body)
}

class DeferMap {
constructor () {
/**
Expand Down Expand Up @@ -436,7 +478,7 @@ class BaseEngine {
const found = this.matchTrace(event.uri)
for (let i = 0; i < found.length; i++) {
const subD = found[i]
if (excludeSid !== subD.getSid()) {
if (excludeSid !== subD.getSid() && subD.filter(event)) {
destSID[subD.getSid()] = true
this.actorPush(subD, event)
}
Expand Down Expand Up @@ -832,6 +874,9 @@ exports.ActorPushKv = ActorPushKv
exports.KeyValueStorageAbstract = KeyValueStorageAbstract

exports.isDataFit = isDataFit
exports.makeDataSerializable = makeDataSerializable
exports.unSerializeData = unSerializeData

exports.DeferMap = DeferMap
exports.BaseEngine = BaseEngine
exports.MemEngine = MemEngine
Expand Down
25 changes: 25 additions & 0 deletions test/01.data_fit.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
'use strict'

const chai = require('chai')
const expect = chai.expect

const isDataFit = require('../lib/realm').isDataFit

describe('01. isDataFit', function () {
it('level-one-null', function () {
expect(isDataFit(null, null)).to.equal(true)
})

it('level-one-data-identical', function () {
expect(isDataFit(1, { kv: 1 })).to.equal(true)
})
it('level-one-data-failed', function () {
expect(isDataFit(1, { kv: 2 })).to.equal(false)
})
it('level-two-identical', function () {
expect(isDataFit({ field1: 1 }, { kv: { field1: '1', field2: 2 } })).to.equal(true)
})
it('level-two-failed', function () {
expect(isDataFit({ field: 1 }, { kv: { field: '2' } })).to.equal(false)
})
})

0 comments on commit 5750bc3

Please sign in to comment.