-
Notifications
You must be signed in to change notification settings - Fork 241
/
state_machine.hxx
373 lines (336 loc) · 12.8 KB
/
state_machine.hxx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
/************************************************************************
Modifications Copyright 2017-2019 eBay Inc.
Author/Developer(s): Jung-Sang Ahn
Original Copyright:
See URL: https://github.com/datatechnology/cornerstone
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
**************************************************************************/
#ifndef _STATE_MACHINE_HXX_
#define _STATE_MACHINE_HXX_
#include "async.hxx"
#include "basic_types.hxx"
#include "buffer.hxx"
#include "pp_util.hxx"
#include "ptr.hxx"
#include <unordered_map>
namespace nuraft {
class cluster_config;
class snapshot;
class state_machine {
__interface_body__(state_machine);
public:
struct ext_op_params {
ext_op_params(ulong _log_idx,
ptr<buffer>& _data)
: log_idx(_log_idx)
, data(_data)
{}
ulong log_idx;
ptr<buffer>& data;
// May add more parameters in the future.
};
/**
* Commit the given Raft log.
*
* NOTE:
* Given memory buffer is owned by caller, so that
* commit implementation should clone it if user wants to
* use the memory even after the commit call returns.
*
* Here provide a default implementation for facilitating the
* situation when application does not care its implementation.
*
* @param log_idx Raft log number to commit.
* @param data Payload of the Raft log.
* @return Result value of state machine.
*/
virtual ptr<buffer> commit(const ulong log_idx,
buffer& data) { return nullptr; }
/**
* (Optional)
* Extended version of `commit`, for users want to keep
* the data without any extra memory copy.
*/
virtual ptr<buffer> commit_ext(const ext_op_params& params)
{ return commit(params.log_idx, *params.data); }
/**
* (Optional)
* Handler on the commit of a configuration change.
*
* @param log_idx Raft log number of the configuration change.
* @param new_conf New cluster configuration.
*/
virtual void commit_config(const ulong log_idx, ptr<cluster_config>& new_conf) { }
/**
* Pre-commit the given Raft log.
*
* Pre-commit is called after appending Raft log,
* before getting acks from quorum nodes.
* Users can ignore this function if not needed.
*
* Same as `commit()`, memory buffer is owned by caller.
*
* @param log_idx Raft log number to commit.
* @param data Payload of the Raft log.
* @return Result value of state machine.
*/
virtual ptr<buffer> pre_commit(const ulong log_idx,
buffer& data) { return nullptr; }
/**
* (Optional)
* Extended version of `pre_commit`, for users want to keep
* the data without any extra memory copy.
*/
virtual ptr<buffer> pre_commit_ext(const ext_op_params& params)
{ return pre_commit(params.log_idx, *params.data); }
/**
* Rollback the state machine to given Raft log number.
*
* It will be called for uncommitted Raft logs only,
* so that users can ignore this function if they don't
* do anything on pre-commit.
*
* Same as `commit()`, memory buffer is owned by caller.
*
* @param log_idx Raft log number to commit.
* @param data Payload of the Raft log.
*/
virtual void rollback(const ulong log_idx,
buffer& data) {}
/**
* (Optional)
* Handler on the rollback of a configuration change.
* The configuration can be either committed or uncommitted one,
* and that can be checked by the given `log_idx`, comparing it with
* the current `cluster_config`'s log index.
*
* @param log_idx Raft log number of the configuration change.
* @param conf The cluster configuration to be rolled back.
*/
virtual void rollback_config(const ulong log_idx, ptr<cluster_config>& conf) { }
/**
* (Optional)
* Extended version of `rollback`, for users want to keep
* the data without any extra memory copy.
*/
virtual void rollback_ext(const ext_op_params& params)
{ rollback(params.log_idx, *params.data); }
/**
* (Optional)
* Return a hint about the preferred size (in number of bytes)
* of the next batch of logs to be sent from the leader.
*
* Only applicable on followers.
*
* @return The preferred size of the next log batch.
* `0` indicates no preferred size (any size is good).
* `positive value` indicates at least one log can be sent,
* (the size of that log may be bigger than this hint size).
* `negative value` indicates no log should be sent since this
* follower is busy handling pending logs.
*/
virtual int64 get_next_batch_size_hint_in_bytes() { return 0; }
/**
* (Deprecated)
* Save the given snapshot chunk to local snapshot.
* This API is for snapshot receiver (i.e., follower).
*
* Since snapshot itself may be quite big, save_snapshot_data()
* will be invoked multiple times for the same snapshot `s`. This
* function should decode the {offset, data} and re-construct the
* snapshot. After all savings are done, apply_snapshot() will be
* called at the end.
*
* Same as `commit()`, memory buffer is owned by caller.
*
* @param s Snapshot instance to save.
* @param offset Byte offset of given chunk.
* @param data Payload of given chunk.
*/
virtual void save_snapshot_data(snapshot& s,
const ulong offset,
buffer& data) {}
/**
* Save the given snapshot object to local snapshot.
* This API is for snapshot receiver (i.e., follower).
*
* This is an optional API for users who want to use logical
* snapshot. Instead of splitting a snapshot into multiple
* physical chunks, this API uses logical objects corresponding
* to a unique object ID. Users are responsible for defining
* what object is: it can be a key-value pair, a set of
* key-value pairs, or whatever.
*
* Same as `commit()`, memory buffer is owned by caller.
*
* @param s Snapshot instance to save.
* @param obj_id[in,out]
* Object ID.
* As a result of this API call, the next object ID
* that reciever wants to get should be set to
* this parameter.
* @param data Payload of given object.
* @param is_first_obj `true` if this is the first object.
* @param is_last_obj `true` if this is the last object.
*/
virtual void save_logical_snp_obj(snapshot& s,
ulong& obj_id,
buffer& data,
bool is_first_obj,
bool is_last_obj) {}
/**
* Apply received snapshot to state machine.
*
* @param s Snapshot instance to apply.
* @returm `true` on success.
*/
virtual bool apply_snapshot(snapshot& s) = 0;
/**
* (Deprecated)
* Read the given snapshot chunk.
* This API is for snapshot sender (i.e., leader).
*
* @param s Snapshot instance to read.
* @param offset Byte offset of given chunk.
* @param[out] data Buffer where the read chunk will be stored.
* @return Amount of bytes read.
* 0 if failed.
*/
virtual int read_snapshot_data(snapshot& s,
const ulong offset,
buffer& data) { return 0; }
/**
* Read the given snapshot object.
* This API is for snapshot sender (i.e., leader).
*
* Same as above, this is an optional API for users who want to
* use logical snapshot.
*
* @param s Snapshot instance to read.
* @param[in,out] user_snp_ctx
* User-defined instance that needs to be passed through
* the entire snapshot read. It can be a pointer to
* state machine specific iterators, or whatever.
* On the first `read_logical_snp_obj` call, it will be
* set to `null`, and this API may return a new pointer if necessary.
* Returned pointer will be passed to next `read_logical_snp_obj`
* call.
* @param obj_id Object ID to read.
* @param[out] data Buffer where the read object will be stored.
* @param[out] is_last_obj Set `true` if this is the last object.
* @return Negative number if failed.
*/
virtual int read_logical_snp_obj(snapshot& s,
void*& user_snp_ctx,
ulong obj_id,
ptr<buffer>& data_out,
bool& is_last_obj) {
data_out = buffer::alloc(4); // A dummy buffer.
is_last_obj = true;
return 0;
}
/**
* Free user-defined instance that is allocated by
* `read_logical_snp_obj`.
* This is an optional API for users who want to use logical snapshot.
*
* @param user_snp_ctx User-defined instance to free.
*/
virtual void free_user_snp_ctx(void*& user_snp_ctx) {}
/**
* Get the latest snapshot instance.
*
* This API will be invoked at the initialization of Raft server,
* so that the last last snapshot should be durable for server restart,
* if you want to avoid unnecessary catch-up.
*
* @return Pointer to the latest snapshot.
*/
virtual ptr<snapshot> last_snapshot() = 0;
/**
* Get the last committed Raft log number.
*
* This API will be invoked at the initialization of Raft server
* to identify what the last committed point is, so that the last
* committed index number should be durable for server restart,
* if you want to avoid unnecessary catch-up.
*
* @return Last committed Raft log number.
*/
virtual ulong last_commit_index() = 0;
/**
* Create a snapshot corresponding to the given info.
*
* @param s Snapshot info to create.
* @param when_done Callback function that will be called after
* snapshot creation is done.
*/
virtual void create_snapshot(snapshot& s,
async_result<bool>::handler_type& when_done) = 0;
/**
* Decide to create snapshot or not.
* Once the pre-defined condition is satisfied, Raft core will invoke
* this function to ask if it needs to create a new snapshot.
* If user-defined state machine does not want to create snapshot
* at this time, this function will return `false`.
*
* @return `true` if wants to create snapshot.
* `false` if does not want to create snapshot.
*/
virtual bool chk_create_snapshot() { return true; }
/**
* Decide to transfer leadership.
* Once the other conditions are met, Raft core will invoke
* this function to ask if it is allowed to transfer the
* leadership to other member.
*
* @return `true` if wants to transfer leadership.
* `false` if not.
*/
virtual bool allow_leadership_transfer() { return true; }
/**
* Parameters for `adjust_commit_index` API.
*/
struct adjust_commit_index_params {
adjust_commit_index_params()
: current_commit_index_(0)
, expected_commit_index_(0)
{}
/**
* The current committed index.
*/
uint64_t current_commit_index_;
/**
* The new target commit index determined by Raft.
*/
uint64_t expected_commit_index_;
/**
* A map of <peer ID, peer's log index>, including the
* leader and learners.
*/
std::unordered_map<int, uint64_t> peer_index_map_;
};
/**
* This function will be called when Raft succeeds in replicating logs
* to an arbitrary follower and attempts to commit logs. Users can manually
* adjust the commit index. The adjusted commit index should be equal to
* or greater than the given `current_commit_index`. Otherwise, no log
* will be committed.
*
* @param params Parameters.
* @return Adjusted commit index.
*/
virtual uint64_t adjust_commit_index(const adjust_commit_index_params& params) {
return params.expected_commit_index_;
}
};
}
#endif //_STATE_MACHINE_HXX_