From 4bdfb7713135cc5043d2b522eacda1b7912dc1d5 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Sat, 27 Jul 2019 11:55:20 +0100 Subject: [PATCH 01/13] (wip) bulk append --- create.js | 7 ++++++ index.js | 1 + minimal.js | 69 ++++++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 64 insertions(+), 13 deletions(-) diff --git a/create.js b/create.js index ec47ca51..8c907f13 100644 --- a/create.js +++ b/create.js @@ -102,8 +102,15 @@ module.exports = function (path, opts, keys) { db.append({ content: content, keys: keys }, cb) } + + function addBulk(messages, cb) { + if (!cb) throw new Error("Expected callback to feed addBulk function") + else db.appendAll({messages: messages, keys: keys}, keys) + } + return { add: add, + addBulk: addBulk, publish: add, id: keys.id, keys: keys diff --git a/index.js b/index.js index 59240e3f..c9b08694 100644 --- a/index.js +++ b/index.js @@ -110,6 +110,7 @@ module.exports = { close : close, del: valid.async(ssb.del, 'msgLink'), publish : valid.async(feed.add, 'string|msgContent'), + publishAll : valid.async(feed.addAll, 'object'), add : valid.async(ssb.add, 'msg'), queue : valid.async(ssb.queue, 'msg'), get : valid.async(ssb.get, 'msgLink|number|object'), diff --git a/minimal.js b/minimal.js index 052aefee..16c0d9e8 100644 --- a/minimal.js +++ b/minimal.js @@ -119,12 +119,21 @@ module.exports = function (dirname, keys, opts) { state.queue = [] append(batch, function (err, v) { batch.forEach(function (data) { - db.post.set(u.originalData(data)) + if (isArray(data)) { + db.post.set(u.originalData(data)) + } else { + data.forEach(d => u.originalData(d)) + } }) cb(err, v) }) }, function reduce (_, msg) { - return V.append(state, hmacKey, msg) + if (isArray(msg)) { + // This is an atomic bulk append + return V.appendBulk(state, hmacKey, msg) + } else { + return V.append(state, hmacKey, msg) + } }, function (_state) { return state.queue.length > 1000 }, function isEmpty (_state) { @@ -179,17 +188,7 @@ module.exports = function (dirname, keys, opts) { db.append = wait(function (opts, cb) { try { var content = opts.content - var recps = opts.content.recps - if (recps) { - const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0 - if (isFeed(recps) || isNonEmptyArrayOfFeeds) { - recps = opts.content.recps = [].concat(recps) // force to array - content = opts.content = box(opts.content, recps) - } else { - const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps) - throw new Error(errMsg) - } - } + throwIfInvalidRecipients(content) var msg = V.create( state.feeds[opts.keys.id], @@ -211,6 +210,36 @@ module.exports = function (dirname, keys, opts) { }) }) + db.appendAll = wait(function (opts, cb) { + try { + var messages = opts.messages + messages.forEach(throwIfInvalidRecipients) + + var validatedMessages = messages.map(msg => { + var timestamp = timestamp() + V.create( + state.feeds[opts.keys.id], + opts.keys, opts.hmacKey || hmacKey, + msg, + timestamp + ) + }) + + queue(validatedMessages, function (err) { + if (err) return cb(err) + var data = state.queue[state.queue.length - 1] + flush.push(function () { + cb(null, data) + }) + }) + + } catch (err) { + cb(err) + return + } + + }) + db.buffer = function () { return queue.buffer } @@ -232,6 +261,20 @@ module.exports = function (dirname, keys, opts) { maps.push(fn) } + function throwIfInvalidRecipients(content) { + var recps = content.recps + if (recps) { + const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0 + if (isFeed(recps) || isNonEmptyArrayOfFeeds) { + recps = content.recps = [].concat(recps) // force to array + content = content = box(opts.content, recps) + } else { + const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps) + throw new Error(errMsg) + } + } + } + return db } From 17e14750e5540847ef493d3f4af60414707172be Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Sat, 27 Jul 2019 13:12:56 +0100 Subject: [PATCH 02/13] We will need a new 'createAll' function in ssb-validate --- minimal.js | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/minimal.js b/minimal.js index 16c0d9e8..6c96b8cd 100644 --- a/minimal.js +++ b/minimal.js @@ -215,15 +215,12 @@ module.exports = function (dirname, keys, opts) { var messages = opts.messages messages.forEach(throwIfInvalidRecipients) - var validatedMessages = messages.map(msg => { - var timestamp = timestamp() - V.create( - state.feeds[opts.keys.id], - opts.keys, opts.hmacKey || hmacKey, - msg, - timestamp - ) - }) + var validatedMessages = V.createAll( + state.feeds[opts.keys.id], + opts.keys, opts.hmacKey || hmacKey, + messages, + timestamp + ) queue(validatedMessages, function (err) { if (err) return cb(err) From d2b434c7a2f48cefd6f5580a6a06f0aa24294c71 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 31 Jul 2019 17:13:42 +0100 Subject: [PATCH 03/13] fix check --- minimal.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/minimal.js b/minimal.js index 6c96b8cd..236b1319 100644 --- a/minimal.js +++ b/minimal.js @@ -119,7 +119,7 @@ module.exports = function (dirname, keys, opts) { state.queue = [] append(batch, function (err, v) { batch.forEach(function (data) { - if (isArray(data)) { + if (!isArray(data)) { db.post.set(u.originalData(data)) } else { data.forEach(d => u.originalData(d)) From 94a28eaf5543949c9367db1f014026505faa966c Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Mon, 5 Aug 2019 17:33:49 +0100 Subject: [PATCH 04/13] Fix existing tests --- minimal.js | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/minimal.js b/minimal.js index 236b1319..507fc02d 100644 --- a/minimal.js +++ b/minimal.js @@ -188,7 +188,7 @@ module.exports = function (dirname, keys, opts) { db.append = wait(function (opts, cb) { try { var content = opts.content - throwIfInvalidRecipients(content) + content = boxOrThrow(content) var msg = V.create( state.feeds[opts.keys.id], @@ -213,7 +213,7 @@ module.exports = function (dirname, keys, opts) { db.appendAll = wait(function (opts, cb) { try { var messages = opts.messages - messages.forEach(throwIfInvalidRecipients) + messages = messages.map(boxOrThrow) var validatedMessages = V.createAll( state.feeds[opts.keys.id], @@ -258,18 +258,20 @@ module.exports = function (dirname, keys, opts) { maps.push(fn) } - function throwIfInvalidRecipients(content) { + function boxOrThrow(content) { var recps = content.recps if (recps) { const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0 if (isFeed(recps) || isNonEmptyArrayOfFeeds) { recps = content.recps = [].concat(recps) // force to array - content = content = box(opts.content, recps) + return box(content, recps) } else { const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps) throw new Error(errMsg) } } + + return content } return db From 8447d5550fbbc7559fb9ff986676457142c48e7d Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Tue, 6 Aug 2019 00:16:05 +0100 Subject: [PATCH 05/13] wip --- create.js | 2 +- indexes/clock.js | 8 +++++++- indexes/feed.js | 9 ++++++++- indexes/last.js | 8 ++++++++ indexes/links.js | 8 ++++++++ minimal.js | 18 ++++++++++++++++-- package.json | 2 +- test/bulkAdd.js | 45 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 94 insertions(+), 6 deletions(-) create mode 100644 test/bulkAdd.js diff --git a/create.js b/create.js index 8c907f13..e220383f 100644 --- a/create.js +++ b/create.js @@ -105,7 +105,7 @@ module.exports = function (path, opts, keys) { function addBulk(messages, cb) { if (!cb) throw new Error("Expected callback to feed addBulk function") - else db.appendAll({messages: messages, keys: keys}, keys) + else db.appendAll({messages: messages, keys: keys}, cb) } return { diff --git a/indexes/clock.js b/indexes/clock.js index a53f45a3..3175837c 100644 --- a/indexes/clock.js +++ b/indexes/clock.js @@ -3,11 +3,17 @@ var ltgt = require('ltgt') // 53 bit integer var MAX_INT = 0x1fffffffffffff var u = require('../util') - var ViewLevel = require('flumeview-level') +var isArray = Array.isArray module.exports = function (db, opts) { var createIndex = ViewLevel(2, function (data) { + if (isArray(data)) { + // The last element is the last element written in the + // last bulk append + data = data[data.length - 1] + } + return [[data.value.author, data.value.sequence]] }) diff --git a/indexes/feed.js b/indexes/feed.js index 7e8d63e8..541f69be 100644 --- a/indexes/feed.js +++ b/indexes/feed.js @@ -2,10 +2,11 @@ var pull = require('pull-stream') var ltgt = require('ltgt') var u = require('../util') - var ViewLevel = require('flumeview-level') +var isArray = Array.isArray function resolveTimestamp (msg) { + // fallback to sync time if no user timestamp or timestamp is after sync time if (!msg.value.timestamp || msg.timestamp < msg.value.timestamp) { return msg.timestamp @@ -16,6 +17,12 @@ function resolveTimestamp (msg) { module.exports = function (db) { var createIndex = ViewLevel(3, function (data) { + if (isArray(data)) { + // The last element is the last element written in the + // last bulk append + data = data[data.length - 1] + } + return [[resolveTimestamp(data), data.value.author]] }) diff --git a/indexes/last.js b/indexes/last.js index 6237aa0e..850ac4cd 100644 --- a/indexes/last.js +++ b/indexes/last.js @@ -1,9 +1,17 @@ var pull = require('pull-stream') var pCont = require('pull-cont') var Reduce = require('flumeview-reduce') +var isArray = Array.isArray module.exports = function () { var createIndex = Reduce(1, function (acc, data) { + + if (isArray(data)) { + // The last element is the last element written in the + // last bulk append + data = data[data.length - 1] + } + if (!acc) acc = {} acc[data.value.author] = { id: data.key, diff --git a/indexes/links.js b/indexes/links.js index e9581903..8bccb8df 100644 --- a/indexes/links.js +++ b/indexes/links.js @@ -6,6 +6,8 @@ var u = require('../util') var Format = u.formatStream var mlib = require('ssb-msgs') +var isArray = Array.isArray + function isString (s) { return typeof s === 'string' } @@ -31,6 +33,12 @@ module.exports = function () { } var createIndex = ViewLevel(2, function (data) { + if (isArray(data)) { + // The last element is the last element written in the + // last bulk append + data = data[data.length - 1] + } + return indexMsg(data.timestamp, data.key, data.value) }) diff --git a/minimal.js b/minimal.js index 507fc02d..4b12a8d2 100644 --- a/minimal.js +++ b/minimal.js @@ -16,7 +16,20 @@ var isArray = Array.isArray function isFunction (f) { return typeof f === 'function' } function unbox (data, unboxers, key) { + + if (isArray(data)) { + return data.map(function(msg) { + return unboxOne(msg, unboxers, key) + }) + } else { + return unboxOne(data, unboxers, key) + } + +} + +function unboxOne(data, unboxers, key) { var plaintext + if (data && isString(data.value.content)) { for (var i = 0; i < unboxers.length; i++) { var unboxer = unboxers[i] @@ -217,9 +230,10 @@ module.exports = function (dirname, keys, opts) { var validatedMessages = V.createAll( state.feeds[opts.keys.id], - opts.keys, opts.hmacKey || hmacKey, + opts.keys, + opts.hmacKey || hmacKey, messages, - timestamp + timestamp() ) queue(validatedMessages, function (err) { diff --git a/package.json b/package.json index 9b844c26..7c01b533 100644 --- a/package.json +++ b/package.json @@ -28,7 +28,7 @@ "ssb-keys": "^7.1.3", "ssb-msgs": "^5.0.0", "ssb-ref": "^2.12.0", - "ssb-validate": "^4.0.0", + "ssb-validate": "file:../ssb-validate", "typewiselite": "^1.0.0", "zerr": "^1.0.0" }, diff --git a/test/bulkAdd.js b/test/bulkAdd.js new file mode 100644 index 00000000..7dea7c6c --- /dev/null +++ b/test/bulkAdd.js @@ -0,0 +1,45 @@ +'use strict' +var tape = require('tape') +var pull = require('pull-stream') +var crypto = require('crypto') + +var createSSB = require('./util') + +module.exports = function (opts) { + + tape('add bulk', function (t) { + var ssb = createSSB('test-ssb-feed', {}) + + var f = ssb.createFeed() + + var messages = [ + { + type: "test", + message: "test body" + }, + { + type: "test", + message: "test body 2" + } + ] + + f.addBulk(messages, function (err, ary) { + if (err) throw err + t.equal(ary.length, 2) + }) + + + }) + + tape('add bulk single item', function (t) { + + + }) + + tape('interleave bulk and singular', function (t) { + + }) + +} + +if (!module.parent) { module.exports({}) } \ No newline at end of file From 3416736941fd2a8b1f4d427b9a6b5be3ef854c6c Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Tue, 6 Aug 2019 17:38:16 +0100 Subject: [PATCH 06/13] some progress. --- minimal.js | 73 ++++++++++++++++++++++++++++++++++++++++++++++--- test/bulkAdd.js | 6 ++-- 2 files changed, 72 insertions(+), 7 deletions(-) diff --git a/minimal.js b/minimal.js index 4b12a8d2..6d389160 100644 --- a/minimal.js +++ b/minimal.js @@ -11,6 +11,8 @@ var ssbKeys = require('ssb-keys') var box = ssbKeys.box var u = require('./util') var isFeed = require('ssb-ref').isFeed +var pull = require('pull-stream') +var asyncMap = require('pull-stream/throughs/async-map') var isArray = Array.isArray function isFunction (f) { return typeof f === 'function' } @@ -130,16 +132,49 @@ module.exports = function (dirname, keys, opts) { var queue = AsyncWrite(function (_, cb) { var batch = state.queue state.queue = [] - append(batch, function (err, v) { - batch.forEach(function (data) { + + var batchAppend = batch.findIndex(function (elem) { + return isArray(elem) + }); + + if (batchAppend === -1) { + append(batch, function (err, v) { + handlePost(batch) + cb(err, v) + }) + } else { + var batchIndexes = findBatchIndexRanges(batch) + + pull( + pull.values(batchIndexes), + asyncMap(function(item, mapCb) { + var startIndex = item[0] + var endIndex = item[1] + var slice = batch.slice(startIndex, endIndex) + + append(slice, function(err, v) { + handlePost(slice) + mapCb(err, v) + }) + } + ), + pull.drain(null, function(err, v) { + cb(err, v) + })) + + } + + function handlePost(d) { + d.forEach(function (data) { if (!isArray(data)) { db.post.set(u.originalData(data)) } else { data.forEach(d => u.originalData(d)) } }) - cb(err, v) - }) + } + + }, function reduce (_, msg) { if (isArray(msg)) { // This is an atomic bulk append @@ -161,6 +196,36 @@ module.exports = function (dirname, keys, opts) { } } + function findBatchIndexRanges(batch) { + + var batchIndexes = batch.map(function(elem, index) { + if (isArray(elem)) { + return index + } else { + return null + } + + }).filter(function(elem) { + return elem === null + }) + + var start = 0 + var result = [] + batchIndexes.forEach(function (batchIndex) { + result.push([start, batchIndex]) + result.push([batchIndex, batchIndex + 1]) + start = batchIndex + 1 + }) + + var lastBatchIndex = batchIndexes[batchIndexes - 1]; + + if (lastBatchIndex < (batch.length - 1)) { + result.push([lastBatchIndex + 1, batch.length]) + } + + return result + } + db.last.get(function (_, last) { // copy to so we avoid weirdness, because this object // tracks the state coming in to the database. diff --git a/test/bulkAdd.js b/test/bulkAdd.js index 7dea7c6c..8b541d95 100644 --- a/test/bulkAdd.js +++ b/test/bulkAdd.js @@ -26,18 +26,18 @@ module.exports = function (opts) { f.addBulk(messages, function (err, ary) { if (err) throw err t.equal(ary.length, 2) + t.end() }) }) tape('add bulk single item', function (t) { - - + t.end() }) tape('interleave bulk and singular', function (t) { - + t.end() }) } From 25434b2de054fd5fc52f85df1984d3f0d30e5807 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Tue, 6 Aug 2019 22:52:55 +0100 Subject: [PATCH 07/13] wip --- minimal.js | 14 +++++++++----- test/bulkAdd.js | 12 ++++++++++-- 2 files changed, 19 insertions(+), 7 deletions(-) diff --git a/minimal.js b/minimal.js index 6d389160..f2c7c1d3 100644 --- a/minimal.js +++ b/minimal.js @@ -152,8 +152,8 @@ module.exports = function (dirname, keys, opts) { var endIndex = item[1] var slice = batch.slice(startIndex, endIndex) - append(slice, function(err, v) { - handlePost(slice) + append(slice[0], function(err, v) { + handlePost(slice[0]) mapCb(err, v) }) } @@ -169,7 +169,7 @@ module.exports = function (dirname, keys, opts) { if (!isArray(data)) { db.post.set(u.originalData(data)) } else { - data.forEach(d => u.originalData(d)) + data.forEach(d => db.post.set(u.originalData(d))) } }) } @@ -206,13 +206,17 @@ module.exports = function (dirname, keys, opts) { } }).filter(function(elem) { - return elem === null + return elem !== null }) var start = 0 var result = [] batchIndexes.forEach(function (batchIndex) { - result.push([start, batchIndex]) + + if (start < batchIndex) { + result.push([start, batchIndex]) + } + result.push([batchIndex, batchIndex + 1]) start = batchIndex + 1 }) diff --git a/test/bulkAdd.js b/test/bulkAdd.js index 8b541d95..7c3de257 100644 --- a/test/bulkAdd.js +++ b/test/bulkAdd.js @@ -8,7 +8,7 @@ var createSSB = require('./util') module.exports = function (opts) { tape('add bulk', function (t) { - var ssb = createSSB('test-ssb-feed', {}) + var ssb = createSSB('test-ssb-feed2', {}) var f = ssb.createFeed() @@ -26,7 +26,15 @@ module.exports = function (opts) { f.addBulk(messages, function (err, ary) { if (err) throw err t.equal(ary.length, 2) - t.end() + + pull( + ssb.createFeedStream(), + pull.collect(function(err,result) { + t.equal(result.length, 2) + t.end() + }) + ) + }) From 1025e683d6dcee53207957104b16e82d886ea1b0 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 10:54:12 +0100 Subject: [PATCH 08/13] Remove un-necessary checks in indexes --- indexes/clock.js | 7 ------- indexes/feed.js | 7 ------- indexes/last.js | 8 -------- indexes/links.js | 8 -------- minimal.js | 1 - 5 files changed, 31 deletions(-) diff --git a/indexes/clock.js b/indexes/clock.js index 3175837c..30c7f261 100644 --- a/indexes/clock.js +++ b/indexes/clock.js @@ -4,16 +4,9 @@ var ltgt = require('ltgt') var MAX_INT = 0x1fffffffffffff var u = require('../util') var ViewLevel = require('flumeview-level') -var isArray = Array.isArray module.exports = function (db, opts) { var createIndex = ViewLevel(2, function (data) { - if (isArray(data)) { - // The last element is the last element written in the - // last bulk append - data = data[data.length - 1] - } - return [[data.value.author, data.value.sequence]] }) diff --git a/indexes/feed.js b/indexes/feed.js index 541f69be..c668fead 100644 --- a/indexes/feed.js +++ b/indexes/feed.js @@ -3,7 +3,6 @@ var pull = require('pull-stream') var ltgt = require('ltgt') var u = require('../util') var ViewLevel = require('flumeview-level') -var isArray = Array.isArray function resolveTimestamp (msg) { @@ -17,12 +16,6 @@ function resolveTimestamp (msg) { module.exports = function (db) { var createIndex = ViewLevel(3, function (data) { - if (isArray(data)) { - // The last element is the last element written in the - // last bulk append - data = data[data.length - 1] - } - return [[resolveTimestamp(data), data.value.author]] }) diff --git a/indexes/last.js b/indexes/last.js index 850ac4cd..6237aa0e 100644 --- a/indexes/last.js +++ b/indexes/last.js @@ -1,17 +1,9 @@ var pull = require('pull-stream') var pCont = require('pull-cont') var Reduce = require('flumeview-reduce') -var isArray = Array.isArray module.exports = function () { var createIndex = Reduce(1, function (acc, data) { - - if (isArray(data)) { - // The last element is the last element written in the - // last bulk append - data = data[data.length - 1] - } - if (!acc) acc = {} acc[data.value.author] = { id: data.key, diff --git a/indexes/links.js b/indexes/links.js index 8bccb8df..e9581903 100644 --- a/indexes/links.js +++ b/indexes/links.js @@ -6,8 +6,6 @@ var u = require('../util') var Format = u.formatStream var mlib = require('ssb-msgs') -var isArray = Array.isArray - function isString (s) { return typeof s === 'string' } @@ -33,12 +31,6 @@ module.exports = function () { } var createIndex = ViewLevel(2, function (data) { - if (isArray(data)) { - // The last element is the last element written in the - // last bulk append - data = data[data.length - 1] - } - return indexMsg(data.timestamp, data.key, data.value) }) diff --git a/minimal.js b/minimal.js index f2c7c1d3..c4b2075f 100644 --- a/minimal.js +++ b/minimal.js @@ -271,7 +271,6 @@ module.exports = function (dirname, keys, opts) { try { var content = opts.content content = boxOrThrow(content) - var msg = V.create( state.feeds[opts.keys.id], opts.keys, opts.hmacKey || hmacKey, From 54599d38915885fcde77373ae343331c7292be54 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 11:02:32 +0100 Subject: [PATCH 09/13] Add monotonic timestamp to each message. --- minimal.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/minimal.js b/minimal.js index c4b2075f..1180ab27 100644 --- a/minimal.js +++ b/minimal.js @@ -294,14 +294,19 @@ module.exports = function (dirname, keys, opts) { db.appendAll = wait(function (opts, cb) { try { var messages = opts.messages - messages = messages.map(boxOrThrow) + messages = messages.map(boxOrThrow).map(function(message) { + return { + content: message, + timestamp: timestamp() + } + + }) var validatedMessages = V.createAll( state.feeds[opts.keys.id], opts.keys, opts.hmacKey || hmacKey, - messages, - timestamp() + messages ) queue(validatedMessages, function (err) { From dbf216d3cbfcbf848521ba520796f55069d83f2a Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 11:06:54 +0100 Subject: [PATCH 10/13] Remove un-necessary check in unboxer --- minimal.js | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/minimal.js b/minimal.js index 1180ab27..2da8d7c6 100644 --- a/minimal.js +++ b/minimal.js @@ -18,18 +18,6 @@ var isArray = Array.isArray function isFunction (f) { return typeof f === 'function' } function unbox (data, unboxers, key) { - - if (isArray(data)) { - return data.map(function(msg) { - return unboxOne(msg, unboxers, key) - }) - } else { - return unboxOne(data, unboxers, key) - } - -} - -function unboxOne(data, unboxers, key) { var plaintext if (data && isString(data.value.content)) { @@ -67,6 +55,7 @@ function unboxOne(data, unboxers, key) { } } } + return data } From 216449c27f314f1d3abe6ed87e8060e91407eb2b Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 11:34:12 +0100 Subject: [PATCH 11/13] Basic interleave test. --- minimal.js | 12 +++++--- package.json | 1 + test/bulkAdd.js | 79 ++++++++++++++++++++++++++++++++++++++++++++----- 3 files changed, 81 insertions(+), 11 deletions(-) diff --git a/minimal.js b/minimal.js index 2da8d7c6..fc8706f6 100644 --- a/minimal.js +++ b/minimal.js @@ -55,7 +55,7 @@ function unbox (data, unboxers, key) { } } } - + return data } @@ -141,8 +141,12 @@ module.exports = function (dirname, keys, opts) { var endIndex = item[1] var slice = batch.slice(startIndex, endIndex) - append(slice[0], function(err, v) { - handlePost(slice[0]) + if (slice.length === 1 && isArray(slice[0])) { + slice = slice[0] + } + + append(slice, function(err, v) { + handlePost(slice) mapCb(err, v) }) } @@ -212,7 +216,7 @@ module.exports = function (dirname, keys, opts) { var lastBatchIndex = batchIndexes[batchIndexes - 1]; - if (lastBatchIndex < (batch.length - 1)) { + if (lastBatchIndex <= (batch.length - 1)) { result.push([lastBatchIndex + 1, batch.length]) } diff --git a/package.json b/package.json index 7c01b533..8c858260 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ }, "devDependencies": { "hexpp": "^2.0.0", + "multicb": "^1.2.2", "pull-abortable": "~4.1.0", "ssb-feed": "^2.2.1", "tape": "^4.8.0", diff --git a/test/bulkAdd.js b/test/bulkAdd.js index 7c3de257..5249f8c7 100644 --- a/test/bulkAdd.js +++ b/test/bulkAdd.js @@ -1,14 +1,15 @@ 'use strict' var tape = require('tape') var pull = require('pull-stream') -var crypto = require('crypto') + +var multicb = require('multicb') var createSSB = require('./util') module.exports = function (opts) { tape('add bulk', function (t) { - var ssb = createSSB('test-ssb-feed2', {}) + var ssb = createSSB('test-ssb-feed-bulk', {}) var f = ssb.createFeed() @@ -34,18 +35,82 @@ module.exports = function (opts) { t.end() }) ) - }) - - }) tape('add bulk single item', function (t) { - t.end() + var ssb = createSSB('test-ssb-feed-bulk2', {}) + + var f = ssb.createFeed() + + var messages = [ + { + type: "test", + message: "test body" + } + ] + + f.addBulk(messages, function (err, ary) { + if (err) throw err + t.equal(ary.length, 1) + + pull( + ssb.createFeedStream(), + pull.collect(function(err,result) { + t.equal(result.length, 1) + t.end() + }) + ) + }) }) tape('interleave bulk and singular', function (t) { - t.end() + + var ssb = createSSB('test-ssb-feed-bulk3', {}) + + var f = ssb.createFeed() + + var done = multicb() + + var messages = [ + { + type: "test", + message: "test body single" + }, + [ + { + type: "test", + message: "test bulk body 1" + }, + { + type: "test", + message: "test body 2" + } + ], + { + type: "test", + message: "test body single 2" + } + ]; + + messages.forEach(message => { + if (Array.isArray(message)) { + f.addBulk(message, done()) + } else { + f.add(message, done()) + } + }) + + done(function(err, results) { + pull( + ssb.createFeedStream(), + pull.collect(function(err, result) { + t.equal(result.length, 4) + t.end() + }) + ) + }) + }) } From 38ce1baa4638f6793c98c64f973fb2a854ae07f3 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 12:35:55 +0100 Subject: [PATCH 12/13] Fail on error for test. --- minimal.js | 11 ++++++++--- test/bulkAdd.js | 5 +++++ 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/minimal.js b/minimal.js index fc8706f6..212f150d 100644 --- a/minimal.js +++ b/minimal.js @@ -127,6 +127,7 @@ module.exports = function (dirname, keys, opts) { }); if (batchAppend === -1) { + append(batch, function (err, v) { handlePost(batch) cb(err, v) @@ -134,6 +135,8 @@ module.exports = function (dirname, keys, opts) { } else { var batchIndexes = findBatchIndexRanges(batch) + var finalResult = null; + pull( pull.values(batchIndexes), asyncMap(function(item, mapCb) { @@ -151,8 +154,10 @@ module.exports = function (dirname, keys, opts) { }) } ), - pull.drain(null, function(err, v) { - cb(err, v) + pull.drain(function(result) { + finalResult = result + }, function(err, v) { + cb(err, finalResult) })) } @@ -216,7 +221,7 @@ module.exports = function (dirname, keys, opts) { var lastBatchIndex = batchIndexes[batchIndexes - 1]; - if (lastBatchIndex <= (batch.length - 1)) { + if (lastBatchIndex < (batch.length - 1)) { result.push([lastBatchIndex + 1, batch.length]) } diff --git a/test/bulkAdd.js b/test/bulkAdd.js index 5249f8c7..e0d09737 100644 --- a/test/bulkAdd.js +++ b/test/bulkAdd.js @@ -102,6 +102,11 @@ module.exports = function (opts) { }) done(function(err, results) { + + if (err) { + t.fail(err) + } + pull( ssb.createFeedStream(), pull.collect(function(err, result) { From 28f15fcb14801fcfec465e4bdda941c505f53939 Mon Sep 17 00:00:00 2001 From: Gordon Martin Date: Wed, 7 Aug 2019 13:43:55 +0100 Subject: [PATCH 13/13] Add comments --- index.js | 4 +++- minimal.js | 22 ++++++++++++++++++---- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/index.js b/index.js index c9b08694..2a2e0aa7 100644 --- a/index.js +++ b/index.js @@ -110,7 +110,9 @@ module.exports = { close : close, del: valid.async(ssb.del, 'msgLink'), publish : valid.async(feed.add, 'string|msgContent'), - publishAll : valid.async(feed.addAll, 'object'), + + // An atomic append of many messages at once + publishAll : valid.async(feed.addBulk, 'object'), add : valid.async(ssb.add, 'msg'), queue : valid.async(ssb.queue, 'msg'), get : valid.async(ssb.get, 'msgLink|number|object'), diff --git a/minimal.js b/minimal.js index 212f150d..724673d8 100644 --- a/minimal.js +++ b/minimal.js @@ -122,17 +122,21 @@ module.exports = function (dirname, keys, opts) { var batch = state.queue state.queue = [] - var batchAppend = batch.findIndex(function (elem) { + var hasBatchAppends = batch.findIndex(function (elem) { return isArray(elem) - }); + }) !== -1; - if (batchAppend === -1) { + if (!hasBatchAppends) { append(batch, function (err, v) { handlePost(batch) cb(err, v) }) } else { + // If there are batch appends, we need to make sure we append those as one 'append' + // operation so that that the append is done atomically, and the appropriate callback + // is called via the flush queue to signal the write has completed + var batchIndexes = findBatchIndexRanges(batch) var finalResult = null; @@ -175,7 +179,6 @@ module.exports = function (dirname, keys, opts) { }, function reduce (_, msg) { if (isArray(msg)) { - // This is an atomic bulk append return V.appendBulk(state, hmacKey, msg) } else { return V.append(state, hmacKey, msg) @@ -194,6 +197,17 @@ module.exports = function (dirname, keys, opts) { } } + /** + * Takes an array of single messages and arrays (bulk messages) + * and returns a list of the slice indexes for the array that such that the + * bulk appends would be performed in one operation and the rest would + * performed in chunks. + * + * e.g. [single_message1, single_message2, [bulk_messages], single_message3, [bulk_messages]] + * would return [[0, 3], [3,4], [4,5], [5,6]] + * + * @param {*} batch the array of writes to be performed. + */ function findBatchIndexRanges(batch) { var batchIndexes = batch.map(function(elem, index) {