Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Introduce an atomic bulk append RPC function #264

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions create.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,15 @@ module.exports = function (path, opts, keys) {

db.append({ content: content, keys: keys }, cb)
}

function addBulk(messages, cb) {
if (!cb) throw new Error("Expected callback to feed addBulk function")
else db.appendAll({messages: messages, keys: keys}, cb)
}

return {
add: add,
addBulk: addBulk,
publish: add,
id: keys.id,
keys: keys
Expand Down
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ module.exports = {
close : close,
del: valid.async(ssb.del, 'msgLink'),
publish : valid.async(feed.add, 'string|msgContent'),

// An atomic append of many messages at once
publishAll : valid.async(feed.addBulk, 'object'),
add : valid.async(ssb.add, 'msg'),
queue : valid.async(ssb.queue, 'msg'),
get : valid.async(ssb.get, 'msgLink|number|object'),
Expand Down
1 change: 0 additions & 1 deletion indexes/clock.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ var ltgt = require('ltgt')
// 53 bit integer
var MAX_INT = 0x1fffffffffffff
var u = require('../util')

var ViewLevel = require('flumeview-level')

module.exports = function (db, opts) {
Expand Down
2 changes: 1 addition & 1 deletion indexes/feed.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
var pull = require('pull-stream')
var ltgt = require('ltgt')
var u = require('../util')

var ViewLevel = require('flumeview-level')

function resolveTimestamp (msg) {

// fallback to sync time if no user timestamp or timestamp is after sync time
if (!msg.value.timestamp || msg.timestamp < msg.value.timestamp) {
return msg.timestamp
Expand Down
177 changes: 159 additions & 18 deletions minimal.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ var ssbKeys = require('ssb-keys')
var box = ssbKeys.box
var u = require('./util')
var isFeed = require('ssb-ref').isFeed
var pull = require('pull-stream')
var asyncMap = require('pull-stream/throughs/async-map')

var isArray = Array.isArray
function isFunction (f) { return typeof f === 'function' }

function unbox (data, unboxers, key) {
var plaintext

if (data && isString(data.value.content)) {
for (var i = 0; i < unboxers.length; i++) {
var unboxer = unboxers[i]
Expand Down Expand Up @@ -52,6 +55,7 @@ function unbox (data, unboxers, key) {
}
}
}

return data
}

Expand Down Expand Up @@ -117,14 +121,68 @@ module.exports = function (dirname, keys, opts) {
var queue = AsyncWrite(function (_, cb) {
var batch = state.queue
state.queue = []
append(batch, function (err, v) {
batch.forEach(function (data) {
db.post.set(u.originalData(data))

var hasBatchAppends = batch.findIndex(function (elem) {
return isArray(elem)
}) !== -1;

if (!hasBatchAppends) {

append(batch, function (err, v) {
handlePost(batch)
cb(err, v)
})
cb(err, v)
})
} else {
// If there are batch appends, we need to make sure we append those as one 'append'
// operation so that that the append is done atomically, and the appropriate callback
// is called via the flush queue to signal the write has completed

var batchIndexes = findBatchIndexRanges(batch)

var finalResult = null;

pull(
pull.values(batchIndexes),
asyncMap(function(item, mapCb) {
var startIndex = item[0]
var endIndex = item[1]
var slice = batch.slice(startIndex, endIndex)

if (slice.length === 1 && isArray(slice[0])) {
slice = slice[0]
}

append(slice, function(err, v) {
handlePost(slice)
mapCb(err, v)
})
}
),
pull.drain(function(result) {
finalResult = result
}, function(err, v) {
cb(err, finalResult)
}))

}

function handlePost(d) {
d.forEach(function (data) {
if (!isArray(data)) {
db.post.set(u.originalData(data))
} else {
data.forEach(d => db.post.set(u.originalData(d)))
}
})
}


}, function reduce (_, msg) {
return V.append(state, hmacKey, msg)
if (isArray(msg)) {
return V.appendBulk(state, hmacKey, msg)
} else {
return V.append(state, hmacKey, msg)
}
}, function (_state) {
return state.queue.length > 1000
}, function isEmpty (_state) {
Expand All @@ -139,6 +197,51 @@ module.exports = function (dirname, keys, opts) {
}
}

/**
* Takes an array of single messages and arrays (bulk messages)
* and returns a list of the slice indexes for the array that such that the
* bulk appends would be performed in one operation and the rest would
* performed in chunks.
*
* e.g. [single_message1, single_message2, [bulk_messages], single_message3, [bulk_messages]]
* would return [[0, 3], [3,4], [4,5], [5,6]]
*
* @param {*} batch the array of writes to be performed.
*/
function findBatchIndexRanges(batch) {

var batchIndexes = batch.map(function(elem, index) {
if (isArray(elem)) {
return index
} else {
return null
}

}).filter(function(elem) {
return elem !== null
})

var start = 0
var result = []
batchIndexes.forEach(function (batchIndex) {

if (start < batchIndex) {
result.push([start, batchIndex])
}

result.push([batchIndex, batchIndex + 1])
start = batchIndex + 1
})

var lastBatchIndex = batchIndexes[batchIndexes - 1];

if (lastBatchIndex < (batch.length - 1)) {
result.push([lastBatchIndex + 1, batch.length])
}

return result
}

db.last.get(function (_, last) {
// copy to so we avoid weirdness, because this object
// tracks the state coming in to the database.
Expand Down Expand Up @@ -179,18 +282,7 @@ module.exports = function (dirname, keys, opts) {
db.append = wait(function (opts, cb) {
try {
var content = opts.content
var recps = opts.content.recps
if (recps) {
const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0
if (isFeed(recps) || isNonEmptyArrayOfFeeds) {
recps = opts.content.recps = [].concat(recps) // force to array
content = opts.content = box(opts.content, recps)
} else {
const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps)
throw new Error(errMsg)
}
}

content = boxOrThrow(content)
var msg = V.create(
state.feeds[opts.keys.id],
opts.keys, opts.hmacKey || hmacKey,
Expand All @@ -211,6 +303,39 @@ module.exports = function (dirname, keys, opts) {
})
})

db.appendAll = wait(function (opts, cb) {
try {
var messages = opts.messages
messages = messages.map(boxOrThrow).map(function(message) {
return {
content: message,
timestamp: timestamp()
}

})

var validatedMessages = V.createAll(
state.feeds[opts.keys.id],
opts.keys,
opts.hmacKey || hmacKey,
messages
)

queue(validatedMessages, function (err) {
if (err) return cb(err)
var data = state.queue[state.queue.length - 1]
flush.push(function () {
cb(null, data)
})
})

} catch (err) {
cb(err)
return
}

})

db.buffer = function () {
return queue.buffer
}
Expand All @@ -232,6 +357,22 @@ module.exports = function (dirname, keys, opts) {
maps.push(fn)
}

function boxOrThrow(content) {
var recps = content.recps
if (recps) {
const isNonEmptyArrayOfFeeds = isArray(recps) && recps.every(isFeed) && recps.length > 0
if (isFeed(recps) || isNonEmptyArrayOfFeeds) {
recps = content.recps = [].concat(recps) // force to array
return box(content, recps)
} else {
const errMsg = 'private message recipients must be valid, was:' + JSON.stringify(recps)
throw new Error(errMsg)
}
}

return content
}

return db
}

3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,13 @@
"ssb-keys": "^7.1.3",
"ssb-msgs": "^5.0.0",
"ssb-ref": "^2.12.0",
"ssb-validate": "^4.0.0",
"ssb-validate": "file:../ssb-validate",
"typewiselite": "^1.0.0",
"zerr": "^1.0.0"
},
"devDependencies": {
"hexpp": "^2.0.0",
"multicb": "^1.2.2",
"pull-abortable": "~4.1.0",
"ssb-feed": "^2.2.1",
"tape": "^4.8.0",
Expand Down
Loading