From e4a70db24556008c269057723ee0f2ae177597d8 Mon Sep 17 00:00:00 2001 From: Norbert Heusser Date: Fri, 23 Aug 2024 17:34:22 +0000 Subject: [PATCH] Add custom request to encoding/decoding --- include/raft.h.in | 14 +++++++++- src/uv_encoding.c | 70 +++++++++++++++++++++++++++++++++++++++-------- src/uv_recv.c | 3 ++ 3 files changed, 74 insertions(+), 13 deletions(-) diff --git a/include/raft.h.in b/include/raft.h.in index 6a2cb95c..a01055b5 100644 --- a/include/raft.h.in +++ b/include/raft.h.in @@ -385,6 +385,16 @@ struct raft_timeout_now raft_index last_log_term; /* Term of log entry at last_log_index. */ }; +/** + * Hold the custom request data + */ +struct raft_request_custom +{ + unsigned char version; + raft_term term; /* Receiver's current_term. */ + struct raft_buffer data; /* Raw request data. */ +}; + /** * Type codes for RPC messages. */ @@ -394,7 +404,8 @@ enum raft_message_type { RAFT_REQUEST_VOTE, RAFT_REQUEST_VOTE_RESULT, RAFT_INSTALL_SNAPSHOT, - RAFT_TIMEOUT_NOW + RAFT_TIMEOUT_NOW, + RAFT_REQUEST_CUSTOM }; /** @@ -436,6 +447,7 @@ struct raft_message struct raft_append_entries_result append_entries_result; struct raft_install_snapshot install_snapshot; struct raft_timeout_now timeout_now; + struct raft_request_custom request_custom; }; }; diff --git a/src/uv_encoding.c b/src/uv_encoding.c index 3aa2c2dd..20233c2f 100644 --- a/src/uv_encoding.c +++ b/src/uv_encoding.c @@ -89,6 +89,12 @@ static size_t sizeofTimeoutNow(void) sizeof(uint64_t) /* Last log term. */; } +static size_t sizeofRequestCustom(void) +{ + return sizeof(uint64_t) + /* Term. */ + sizeof(uint64_t); /* Length of custom data */ +} + size_t uvSizeofBatchHeader(size_t n) { return 8 + /* Number of entries in the batch, little endian */ @@ -198,6 +204,15 @@ static void encodeTimeoutNow(const struct raft_timeout_now *p, void *buf) bytePut64(&cursor, p->last_log_term); } +static void encodeRequestCustom(const struct raft_request_custom *p, + void *buf) +{ + uint8_t *cursor = buf; + + bytePut64(&cursor, p->term); /* Leader's term */ + bytePut64(&cursor, p->data.len); /* Length of snapshot data */ +} + int uvEncodeMessage(const struct raft_message *message, uv_buf_t **bufs, unsigned *n_bufs) @@ -234,6 +249,10 @@ int uvEncodeMessage(const struct raft_message *message, header.len += sizeofTimeoutNow(); version = message->timeout_now.version; break; + case RAFT_REQUEST_CUSTOM: + header.len += sizeofRequestCustom(); + version = message->timeout_now.version; + break; default: return RAFT_MALFORMED; }; @@ -254,6 +273,8 @@ int uvEncodeMessage(const struct raft_message *message, bytePut64(&cursor, header.len - RAFT_IO_UV__PREAMBLE_SIZE); + *n_bufs = 1; + /* Encode the request header. */ switch (message->type) { case RAFT_REQUEST_VOTE: @@ -264,30 +285,27 @@ int uvEncodeMessage(const struct raft_message *message, break; case RAFT_APPEND_ENTRIES: encodeAppendEntries(&message->append_entries, cursor); + /* For AppendEntries request we also send the entries payload. */ + *n_bufs += message->append_entries.n_entries; break; case RAFT_APPEND_ENTRIES_RESULT: encodeAppendEntriesResult(&message->append_entries_result, cursor); break; case RAFT_INSTALL_SNAPSHOT: encodeInstallSnapshot(&message->install_snapshot, cursor); + /* For InstallSnapshot request we also send the snapshot payload. */ + *n_bufs += 1; break; case RAFT_TIMEOUT_NOW: encodeTimeoutNow(&message->timeout_now, cursor); break; + case RAFT_REQUEST_CUSTOM: + encodeRequestCustom(&message->request_custom, cursor); + /* For RequestCustom we also send the custom data */ + *n_bufs += 1; + break; }; - *n_bufs = 1; - - /* For AppendEntries request we also send the entries payload. */ - if (message->type == RAFT_APPEND_ENTRIES) { - *n_bufs += message->append_entries.n_entries; - } - - /* For InstallSnapshot request we also send the snapshot payload. */ - if (message->type == RAFT_INSTALL_SNAPSHOT) { - *n_bufs += 1; - } - *bufs = raft_calloc(*n_bufs, sizeof **bufs); if (*bufs == NULL) { goto oom_after_header_alloc; @@ -295,6 +313,7 @@ int uvEncodeMessage(const struct raft_message *message, (*bufs)[0] = header; + if (message->type == RAFT_APPEND_ENTRIES) { unsigned i; for (i = 0; i < message->append_entries.n_entries; i++) { @@ -310,6 +329,11 @@ int uvEncodeMessage(const struct raft_message *message, (*bufs)[1].len = message->install_snapshot.data.len; } + if (message->type == RAFT_REQUEST_CUSTOM) { + (*bufs)[1].base = message->request_custom.data.base; + (*bufs)[1].len = message->request_custom.data.len; + } + return 0; oom_after_header_alloc: @@ -569,6 +593,24 @@ static void decodeTimeoutNow(const uv_buf_t *buf, struct raft_timeout_now *p) p->last_log_term = byteGet64(&cursor); } +static int decodeRequestCustom(unsigned char version, + const uv_buf_t *buf, + struct raft_request_custom* rp) +{ + const uint8_t *cursor; + + assert(buf != NULL); + assert(rp != NULL); + + cursor = (void *)buf->base; + + rp->version = version; + rp->term = byteGet64(&cursor); + rp->data.len = (size_t)byteGet64(&cursor); + + return 0; +} + int uvDecodeMessage(uint8_t type, uint8_t version, const uv_buf_t *header, @@ -610,6 +652,10 @@ int uvDecodeMessage(uint8_t type, case RAFT_TIMEOUT_NOW: decodeTimeoutNow(header, &message->timeout_now); break; + case RAFT_REQUEST_CUSTOM: + decodeRequestCustom(version, header, &message->request_custom); + *payload_len += message->request_custom.data.len; + break; default: rv = RAFT_IOERR; break; diff --git a/src/uv_recv.c b/src/uv_recv.c index 884a3beb..80b63e5c 100644 --- a/src/uv_recv.c +++ b/src/uv_recv.c @@ -298,6 +298,9 @@ static void uvServerReadCb(uv_stream_t *stream, case RAFT_INSTALL_SNAPSHOT: s->message.install_snapshot.data.base = s->payload.base; break; + case RAFT_REQUEST_CUSTOM: + s->message.request_custom.data.base = s->payload.base; + break; default: /* We should never have read a payload in the first place */ assert(0);