diff --git a/create.js b/create.js index ec47ca51..e220383f 100644 --- a/create.js +++ b/create.js @@ -102,8 +102,15 @@ module.exports = function (path, opts, keys) { db.append({ content: content, keys: keys }, cb) } + + function addBulk(messages, cb) { + if (!cb) throw new Error("Expected callback to feed addBulk function") + else db.appendAll({messages: messages, keys: keys}, cb) + } + return { add: add, + addBulk: addBulk, publish: add, id: keys.id, keys: keys diff --git a/index.js b/index.js index 59240e3f..2a2e0aa7 100644 --- a/index.js +++ b/index.js @@ -110,6 +110,9 @@ module.exports = { close : close, del: valid.async(ssb.del, 'msgLink'), publish : valid.async(feed.add, 'string|msgContent'), + + // An atomic append of many messages at once + publishAll : valid.async(feed.addBulk, 'object'), add : valid.async(ssb.add, 'msg'), queue : valid.async(ssb.queue, 'msg'), get : valid.async(ssb.get, 'msgLink|number|object'), diff --git a/indexes/clock.js b/indexes/clock.js index a53f45a3..30c7f261 100644 --- a/indexes/clock.js +++ b/indexes/clock.js @@ -3,7 +3,6 @@ var ltgt = require('ltgt') // 53 bit integer var MAX_INT = 0x1fffffffffffff var u = require('../util') - var ViewLevel = require('flumeview-level') module.exports = function (db, opts) { diff --git a/indexes/feed.js b/indexes/feed.js index 7e8d63e8..c668fead 100644 --- a/indexes/feed.js +++ b/indexes/feed.js @@ -2,10 +2,10 @@ var pull = require('pull-stream') var ltgt = require('ltgt') var u = require('../util') - var ViewLevel = require('flumeview-level') function resolveTimestamp (msg) { + // fallback to sync time if no user timestamp or timestamp is after sync time if (!msg.value.timestamp || msg.timestamp < msg.value.timestamp) { return msg.timestamp diff --git a/minimal.js b/minimal.js index 052aefee..724673d8 100644 --- a/minimal.js +++ b/minimal.js @@ -11,12 +11,15 @@ var ssbKeys = require('ssb-keys') var box = ssbKeys.box var u = require('./util') var isFeed = require('ssb-ref').isFeed +var pull = require('pull-stream') +var asyncMap = require('pull-stream/throughs/async-map') var isArray = Array.isArray function isFunction (f) { return typeof f === 'function' } function unbox (data, unboxers, key) { var plaintext + if (data && isString(data.value.content)) { for (var i = 0; i < unboxers.length; i++) { var unboxer = unboxers[i] @@ -52,6 +55,7 @@ function unbox (data, unboxers, key) { } } } + return data } @@ -117,14 +121,68 @@ module.exports = function (dirname, keys, opts) { var queue = AsyncWrite(function (_, cb) { var batch = state.queue state.queue = [] - append(batch, function (err, v) { - batch.forEach(function (data) { - db.post.set(u.originalData(data)) + + var hasBatchAppends = batch.findIndex(function (elem) { + return isArray(elem) + }) !== -1; + + if (!hasBatchAppends) { + + append(batch, function (err, v) { + handlePost(batch) + cb(err, v) }) - cb(err, v) - }) + } else { + // If there are batch appends, we need to make sure we append those as one 'append' + // operation so that that the append is done atomically, and the appropriate callback + // is called via the flush queue to signal the write has completed + + var batchIndexes = findBatchIndexRanges(batch) + + var finalResult = null; + + pull( + pull.values(batchIndexes), + asyncMap(function(item, mapCb) { + var startIndex = item[0] + var endIndex = item[1] + var slice = batch.slice(startIndex, endIndex) + + if (slice.length === 1 && isArray(slice[0])) { + slice = slice[0] + } + + append(slice, function(err, v) { + handlePost(slice) + mapCb(err, v) + }) + } + ), + pull.drain(function(result) { + finalResult = result + }, function(err, v) { + cb(err, finalResult) + })) + + } + + function handlePost(d) { + d.forEach(function (data) { + if (!isArray(data)) { + db.post.set(u.originalData(data)) + } else { + data.forEach(d => db.post.set(u.originalData(d))) + } + }) + } + + }, function reduce (_, msg) { - return V.append(state, hmacKey, msg) + if (isArray(msg)) { + return V.appendBulk(state, hmacKey, msg) + } else { + return V.append(state, hmacKey, msg) + } }, function (_state) { return state.queue.length > 1000 }, function isEmpty (_state) { @@ -139,6 +197,51 @@ module.exports = function (dirname, keys, opts) { } } + /** + * Takes an array of single messages and arrays (bulk messages) + * and returns a list of the slice indexes for the array that such that the + * bulk appends would be performed in one operation and the rest would + * performed in chunks. + * + * e.g. [single_message1, single_message2, [bulk_messages], single_message3, [bulk_messages]] + * would return [[0, 3], [3,4], [4,5], [5,6]] + * + * @param {*} batch the array of writes to be performed. + */ + function findBatchIndexRanges(batch) { + + var batchIndexes = batch.map(function(elem, index) { + if (isArray(elem)) { + return index + } else { + return null + } + + }).filter(function(elem) { + return elem !== null + }) + + var start = 0 + var result = [] + batchIndexes.forEach(function (batchIndex) { + + if (start < batchIndex) { + result.push([start, batchIndex]) + } + + result.push([batchIndex, batchIndex + 1]) + start = batchIndex + 1 + }) + + var lastBatchIndex = batchIndexes[batchIndexes - 1]; + + if (lastBatchIndex < (batch.length - 1)) { + result.push([lastBatchIndex + 1, batch.length]) + } + + return result + } + db.last.get(function (_, last) { // copy to so we avoid weirdness, because this object // tracks the state coming in to the database. @@ -179,18 +282,7 @@ module.exports = function (dirname, keys, opts) { db.append = wait(function (opts, cb) { try { var content = opts.content - var recps = opts.content.recps - if (recps) { - const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0 - if (isFeed(recps) || isNonEmptyArrayOfFeeds) { - recps = opts.content.recps = [].concat(recps) // force to array - content = opts.content = box(opts.content, recps) - } else { - const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps) - throw new Error(errMsg) - } - } - + content = boxOrThrow(content) var msg = V.create( state.feeds[opts.keys.id], opts.keys, opts.hmacKey || hmacKey, @@ -211,6 +303,39 @@ module.exports = function (dirname, keys, opts) { }) }) + db.appendAll = wait(function (opts, cb) { + try { + var messages = opts.messages + messages = messages.map(boxOrThrow).map(function(message) { + return { + content: message, + timestamp: timestamp() + } + + }) + + var validatedMessages = V.createAll( + state.feeds[opts.keys.id], + opts.keys, + opts.hmacKey || hmacKey, + messages + ) + + queue(validatedMessages, function (err) { + if (err) return cb(err) + var data = state.queue[state.queue.length - 1] + flush.push(function () { + cb(null, data) + }) + }) + + } catch (err) { + cb(err) + return + } + + }) + db.buffer = function () { return queue.buffer } @@ -232,6 +357,22 @@ module.exports = function (dirname, keys, opts) { maps.push(fn) } + function boxOrThrow(content) { + var recps = content.recps + if (recps) { + const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0 + if (isFeed(recps) || isNonEmptyArrayOfFeeds) { + recps = content.recps = [].concat(recps) // force to array + return box(content, recps) + } else { + const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps) + throw new Error(errMsg) + } + } + + return content + } + return db } diff --git a/package.json b/package.json index 9b844c26..8c858260 100644 --- a/package.json +++ b/package.json @@ -28,12 +28,13 @@ "ssb-keys": "^7.1.3", "ssb-msgs": "^5.0.0", "ssb-ref": "^2.12.0", - "ssb-validate": "^4.0.0", + "ssb-validate": "file:../ssb-validate", "typewiselite": "^1.0.0", "zerr": "^1.0.0" }, "devDependencies": { "hexpp": "^2.0.0", + "multicb": "^1.2.2", "pull-abortable": "~4.1.0", "ssb-feed": "^2.2.1", "tape": "^4.8.0", diff --git a/test/bulkAdd.js b/test/bulkAdd.js new file mode 100644 index 00000000..e0d09737 --- /dev/null +++ b/test/bulkAdd.js @@ -0,0 +1,123 @@ +'use strict' +var tape = require('tape') +var pull = require('pull-stream') + +var multicb = require('multicb') + +var createSSB = require('./util') + +module.exports = function (opts) { + + tape('add bulk', function (t) { + var ssb = createSSB('test-ssb-feed-bulk', {}) + + var f = ssb.createFeed() + + var messages = [ + { + type: "test", + message: "test body" + }, + { + type: "test", + message: "test body 2" + } + ] + + f.addBulk(messages, function (err, ary) { + if (err) throw err + t.equal(ary.length, 2) + + pull( + ssb.createFeedStream(), + pull.collect(function(err,result) { + t.equal(result.length, 2) + t.end() + }) + ) + }) + }) + + tape('add bulk single item', function (t) { + var ssb = createSSB('test-ssb-feed-bulk2', {}) + + var f = ssb.createFeed() + + var messages = [ + { + type: "test", + message: "test body" + } + ] + + f.addBulk(messages, function (err, ary) { + if (err) throw err + t.equal(ary.length, 1) + + pull( + ssb.createFeedStream(), + pull.collect(function(err,result) { + t.equal(result.length, 1) + t.end() + }) + ) + }) + }) + + tape('interleave bulk and singular', function (t) { + + var ssb = createSSB('test-ssb-feed-bulk3', {}) + + var f = ssb.createFeed() + + var done = multicb() + + var messages = [ + { + type: "test", + message: "test body single" + }, + [ + { + type: "test", + message: "test bulk body 1" + }, + { + type: "test", + message: "test body 2" + } + ], + { + type: "test", + message: "test body single 2" + } + ]; + + messages.forEach(message => { + if (Array.isArray(message)) { + f.addBulk(message, done()) + } else { + f.add(message, done()) + } + }) + + done(function(err, results) { + + if (err) { + t.fail(err) + } + + pull( + ssb.createFeedStream(), + pull.collect(function(err, result) { + t.equal(result.length, 4) + t.end() + }) + ) + }) + + }) + +} + +if (!module.parent) { module.exports({}) } \ No newline at end of file