This repository has been archived by the owner on May 14, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flumelog.diff
84 lines (72 loc) · 2.22 KB
/
flumelog.diff
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
diff --git a/index.js b/index.js
index 84474bd..98b7036 100644
--- a/index.js
+++ b/index.js
@@ -7,6 +7,8 @@ const debounce = require('lodash.debounce')
const AtomicFile = require('atomic-file/buffer')
const toBuffer = require('typedarray-to-buffer')
+const pull = require('pull-stream')
+
module.exports = function (db, indexesPath) {
function saveTypedArray(name, seq, count, arr, cb) {
const filename = path.join(indexesPath, name + ".index")
@@ -288,9 +290,9 @@ module.exports = function (db, indexesPath) {
var updatedTimestampIndex = false
const start = Date.now()
- db.stream({ gt: index.seq }).pipe({
- paused: false,
- write: function (data) {
+ pull(
+ db.stream({ gt: index.seq }),
+ pull.drain((data) => {
if (updateOffsetIndex(offset, data.seq))
updatedOffsetIndex = true
@@ -300,8 +302,7 @@ module.exports = function (db, indexesPath) {
updateIndexValue(op.data, index, data.value, offset)
offset++
- },
- end: () => {
+ }, () => {
var count = offset // incremented at end
console.log(`time: ${Date.now()-start}ms, total items: ${count}`)
@@ -315,8 +316,8 @@ module.exports = function (db, indexesPath) {
saveIndex(op.data.indexName, index.seq, index.data)
cb()
- }
- })
+ })
+ )
}
function createIndexes(missingIndexes, cb) {
@@ -333,10 +334,10 @@ module.exports = function (db, indexesPath) {
var updatedOffsetIndex = false
var updatedTimestampIndex = false
const start = Date.now()
-
- db.stream({}).pipe({
- paused: false,
- write: function (data) {
+
+ pull(
+ db.stream({}),
+ pull.drain((data) => {
var seq = data.seq
var buffer = data.value
@@ -354,8 +355,8 @@ module.exports = function (db, indexesPath) {
})
offset++
- },
- end: () => {
+ }, () => {
+ console.log("done", offset)
var count = offset // incremented at end
console.log(`time: ${Date.now()-start}ms, total items: ${count}`)
@@ -372,8 +373,8 @@ module.exports = function (db, indexesPath) {
}
cb()
- }
- })
+ })
+ )
}
function setupIndex(op) {