-
Notifications
You must be signed in to change notification settings - Fork 1
/
reactive.js
117 lines (111 loc) · 4.46 KB
/
reactive.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/* reactive engine. the code here combines functions exposed by the
* library into a single function to be run. i keep all code depending
* on Bacon here, so if somebody will want to stop using Bacon in the
* future, it will be easier */
var lib2 = require('./lib')
var Bacon = require('baconjs')
// this is an adapter function. it gets a Pouch changes object and it
// returns a Bacon stream
function changesToStream (changes, lib) {
return Bacon.fromBinder(function (sink) {
changes
.on('change', function (change) {
sink(change.doc)
})
.on('error', function () {
sink(new Bacon.End())
})
.on('complete', function () {
sink(new Bacon.End())
})
})
}
module.exports.main = function (options, mockedLib) {
var lib = mockedLib || lib2
/* the initial configuration document is fetched from the
* database. this is updated when a change is detected */
var withOptions = lib.withOptions(options)
function onError (err) {
var text = 'error on a Bacon stream'
lib.log.error(text)
lib.log.error(JSON.stringify(err))
withOptions.captureMessage(text, { extra: err })
}
withOptions
.getFirstConfigurationDocument()
.then(function (firstConfigurationDocument) {
lib.log.debug('got configuration document ' + JSON.stringify(firstConfigurationDocument))
// expose for tests
module.exports.firstConfigurationDocument = firstConfigurationDocument
var configurationDocuments = (function () {
/* changes feeds are wrapped into `Bacon.repeat` in order to
* be robust to network conditions with a small timeout */
var stream = Bacon.repeat(function () {
var changes = withOptions.followers.configuration()
return changesToStream(changes, lib)
})
stream.onValue(function () {
lib.log.info('the configuration document changed on the database: reloading with new settings')
lib.log.debug(JSON.stringify(arguments))
})
stream.onError(onError)
return stream
.toProperty()
.startWith(firstConfigurationDocument)
})()
/* changes feeds are wrapped into `Bacon.repeat` in order to be
* robust to network conditions with a small timeout */
var changes = Bacon.repeat(function () {
return changesToStream(withOptions.followers.view(), lib)
})
changes.onError(onError)
// the configuration document is a property, while the changes
// are a stream. whenever a change is detected, we want to use
// the latest configuration document. this can be achieved with
// `property.sampledBy(stream, f)`
var changesAndConfigurations = configurationDocuments
.sampledBy(changes, function (configurationDocument, change) {
return {
configurationDocument: configurationDocument,
change: change
}
})
// mark the document as started. if the document goes through it
// has been marked and it will not be collected by the filtering
// view anymore
var marked = changesAndConfigurations.flatMap(function (obj) {
var promise = withOptions.markAsStarted(obj)
return Bacon.fromPromise(promise)
})
// we convert the returned promises to streams which will be
// concatenated by `flatMap`
var inlined = marked.flatMap(function (obj) {
var promise = withOptions.inline(obj)
return Bacon.fromPromise(promise)
})
// for every change event we create a stream of messages to be
// sent, then all those streams are flatted together. since we
// are generating streams from arrays, they will be closed
// automatically
var outgoing = inlined.flatMap(function (obj) {
try {
var outgoingArray = lib.dispatch(obj)
return Bacon.fromArray(outgoingArray)
} catch (error) {
withOptions.captureError(error)
return Bacon.never()
}
})
outgoing.onValue(withOptions.sendToMobile)
outgoing.onError(onError)
// export for testing
module.exports.changes = changes
module.exports.configurationDocuments = configurationDocuments
module.exports.changesAndConfigurations = changesAndConfigurations
module.exports.outgoing = outgoing
}, function () {
withOptions.captureMessage('error fetching the configuration document', {
extra: { 'promise error': JSON.stringify(arguments) }
})
})
}