forked from unshiftio/liferaft
-
Notifications
You must be signed in to change notification settings - Fork 0
/
log.js
348 lines (316 loc) · 8.39 KB
/
log.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
const encode = require('encoding-down');
const levelup = require('levelup');
/**
* @typedef Entry
* @property {number} index the key for the entry
* @property {number} term the term that the entry was saved in
* @property {boolean} Committed if the entry has been committed
* @property {array} responses number of followers that have saved the log entry
* @property {object} command The command to be used in the raft state machine
*/
class Log {
/**
* @class
* @param {object} node The raft node using this log
* @param {object} Options Options object
* @param {object} Options.[adapter= require('leveldown')] Leveldown adapter, defaults to leveldown
* @param {string} Options.[path='./'] Path to save the log db to
* @return {Log}
*/
constructor (node, {adapter = require('leveldown'), path = ''}) {
this.node = node;
this.committedIndex = 0;
this.db = levelup(encode(adapter(path), { valueEncoding: 'json', keyEncoding: 'binary'}));
}
/**
* saveCommand - Saves a command to the log
* Initially the command is uncommitted. Once a majority
* of follower nodes have saved the log entry it will be
* committed.
*
* A follow node will also use this method to save a received command to
* its log
*
* @async
* @param {object} command A json object to save to the log
* @param {number} term Term to save with the log entry
* @param {number} [index] Index to save the entry with. This is used by the followers
* @return {Promise<entry>} Description
*/
async saveCommand (command, term, index) {
if (!index) {
const {
index: lastIndex,
} = await this.getLastInfo();
index = lastIndex + 1;
}
const entry = {
term: term,
index,
committed: false,
responses: [{
address: this.node.address, // start with vote from leader
ack: true
}],
command,
}
await this.put(entry);
return entry;
}
/**
* put - Save entry to database using the index as the key
*
* @async
* @param {Entry} entry entry to save
* @return {Promise<void>} Resolves once entry is saved
* @public
*/
put (entry) {
return this.db.put(entry.index, entry);
}
/**
* getEntriesAfter - Get all the entries after a specific index
*
* @param {number} index Index that entries must be greater than
* @return {Promise<Entry[]>} returns all entries
* @public
*/
getEntriesAfter(index) {
const entries = [];
return new Promise((resolve, reject) => {
this.db.createReadStream({gt: index})
.on('data', data => {
entries.push(data.value);
})
.on('error', err => {
reject(err)
})
.on('end', () => {
resolve(entries);
})
});
}
/**
* removeEntriesAfter - Removes all entries after a given index
*
* @async
* @param {Number} index Index to use to find all entries after
* @return {Promise<void>} Returns once all antries are removed
* @public
*/
async removeEntriesAfter (index) {
const entries = await this.getEntriesAfter(index)
return Promise.all(entries.map(entry => {
return this.db.del(entry.index);
}));
}
/**
* has - Checks if entry exists at index
*
* @async
* @param {number} index Index position to check if entry exists
* @return {boolean} Boolean on whether entry exists at index
* @public
*/
async has (index) {
try {
const entry = await this.db.get(index);
return true
} catch (err) {
return false;
}
}
/**
* get - Gets an entry at the specified index position
*
* @param {type} index Index position of entry
* @return {Promise<Entry>} Promise of found entry returns NotFoundError if does not exist
* @public
*/
get (index) {
return this.db.get(index);
}
/**
* getLastInfo - Returns index, term of the last entry in the long along with
* the committedIndex
*
* @async
* @return {Promise<Object>} Last entries index, term and committedIndex
*/
async getLastInfo () {
const { index, term } = await this.getLastEntry();
return {
index,
term,
committedIndex: this.committedIndex
};
}
/**
* getLastEntry - Returns last entry in the log
*
* @return {Promise<Entry>} returns {index: 0, term: node.term} if there are no entries in the log
*/
getLastEntry () {
return new Promise((resolve, reject) => {
let hasResolved = false;
let entry = {
index: 0,
term: this.node.term
};
this.db.createReadStream({reverse: true, limit: 1})
.on('data', data => {
hasResolved = true;
entry = data.value;
})
.on('error', err => {
hasResolved = true;
reject(err)
})
.on('end', () => {
resolve(entry);
})
});
}
/**
* getEntryInfoBefore - Gets the index and term of the previous entry along with the log's committedIndex
* If there is no item before it returns {index: 0}
*
*
* @async
* @param {Entry} entry
* @return {Promise<object>} {index, term, committedIndex}
*/
async getEntryInfoBefore (entry) {
const {index, term} = await this.getEntryBefore(entry);
return {
index,
term,
committedIndex: this.committedIndex
};
}
/**
* getEntryBefore - Get entry before the specified entry
* If there is no item before it returns {index: 0}
*
* @async
* @param {Entry} entry
*
* @return {Promise<Entry>}
*/
getEntryBefore (entry) {
const defaultInfo = {
index: 0,
term: this.node.term
};
// We know it is the first entry, so save the query time
if (entry.index === 1) {
return Promise.resolve(defaultInfo);
}
return new Promise((resolve, reject) => {
let hasResolved = false;
this.db.createReadStream({
reverse: true,
limit: 1,
lt: entry.index
})
.on('data', (data) => {
hasResolved = true;
resolve(data.value);
})
.on('error', (err) => {
hasResolved = true;
reject(err);
})
.on('end', () => {
if (!hasResolved) {
// Returns empty index if there is no items
// before entry or log is empty
resolve(defaultInfo);
}
});
});
}
/**
* commandAck - acknowledges a follow with address has stored entry at index
* This is used to determine if a quorom has been met for a log entry and
* if enough followers have stored it so that it can be committed
*
* @async
* @param {number} index Index of entry that follow has stored
* @param {string} address Address of follower that has stored log
* @return {Promise<Entry>}
*/
async commandAck (index, address) {
let entry;
try {
entry = await this.get(index);
} catch (err) {
return {
responses: []
}
}
const entryIndex = await entry.responses.findIndex(resp => resp.address === address);
// node hasn't voted yet. Add response
if (entryIndex === -1) {
entry.responses.push({
address,
ack: true
});
}
await this.put(entry);
return entry;
}
/**
* commit - Set the entry to committed
*
* @async
* @param {number} Index index
*
* @return {Promise<entry>}
*/
async commit (index) {
const entry = await this.db.get(index);
entry.committed = true;
this.committedIndex = entry.index;
return this.put(entry);
}
/**
* getUncommittedEntriesUpToIndex - Returns all entries before index that have not been committed yet
*
* @param {number} index Index value to find all entries up to
* @return {Promise<Entry[]}
* @private
*/
getUncommittedEntriesUpToIndex (index) {
return new Promise((resolve, reject) => {
let hasResolved = false;
const entries = [];
this.db.createReadStream({
gt: this.committedIndex,
lte: index
})
.on('data', data => {
if (!data.value.committed) {
entries.push(data.value);
}
})
.on('error', err => {
reject(err)
})
.on('end', () => {
resolve(entries);
});
});
}
/**
* end - Log end
* Called when the node is shutting down
*
* @return {boolean} Successful close.
* @private
*/
end () {
return this.db.close();
}
};
module.exports = Log;