Skip to content

Commit

Permalink
Add custom request to encoding/decoding
Browse files Browse the repository at this point in the history
  • Loading branch information
NorbertHeusser committed Aug 23, 2024
1 parent 00defd1 commit e4a70db
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 13 deletions.
14 changes: 13 additions & 1 deletion include/raft.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,16 @@ struct raft_timeout_now
raft_index last_log_term; /* Term of log entry at last_log_index. */
};

/**
* Hold the custom request data
*/
struct raft_request_custom
{
unsigned char version;
raft_term term; /* Receiver's current_term. */
struct raft_buffer data; /* Raw request data. */
};

/**
* Type codes for RPC messages.
*/
Expand All @@ -394,7 +404,8 @@ enum raft_message_type {
RAFT_REQUEST_VOTE,
RAFT_REQUEST_VOTE_RESULT,
RAFT_INSTALL_SNAPSHOT,
RAFT_TIMEOUT_NOW
RAFT_TIMEOUT_NOW,
RAFT_REQUEST_CUSTOM
};

/**
Expand Down Expand Up @@ -436,6 +447,7 @@ struct raft_message
struct raft_append_entries_result append_entries_result;
struct raft_install_snapshot install_snapshot;
struct raft_timeout_now timeout_now;
struct raft_request_custom request_custom;
};
};

Expand Down
70 changes: 58 additions & 12 deletions src/uv_encoding.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ static size_t sizeofTimeoutNow(void)
sizeof(uint64_t) /* Last log term. */;
}

static size_t sizeofRequestCustom(void)
{
return sizeof(uint64_t) + /* Term. */
sizeof(uint64_t); /* Length of custom data */
}

size_t uvSizeofBatchHeader(size_t n)
{
return 8 + /* Number of entries in the batch, little endian */
Expand Down Expand Up @@ -198,6 +204,15 @@ static void encodeTimeoutNow(const struct raft_timeout_now *p, void *buf)
bytePut64(&cursor, p->last_log_term);
}

static void encodeRequestCustom(const struct raft_request_custom *p,
void *buf)
{
uint8_t *cursor = buf;

bytePut64(&cursor, p->term); /* Leader's term */
bytePut64(&cursor, p->data.len); /* Length of snapshot data */
}

int uvEncodeMessage(const struct raft_message *message,
uv_buf_t **bufs,
unsigned *n_bufs)
Expand Down Expand Up @@ -234,6 +249,10 @@ int uvEncodeMessage(const struct raft_message *message,
header.len += sizeofTimeoutNow();
version = message->timeout_now.version;
break;
case RAFT_REQUEST_CUSTOM:
header.len += sizeofRequestCustom();
version = message->timeout_now.version;
break;
default:
return RAFT_MALFORMED;
};
Expand All @@ -254,6 +273,8 @@ int uvEncodeMessage(const struct raft_message *message,

bytePut64(&cursor, header.len - RAFT_IO_UV__PREAMBLE_SIZE);

*n_bufs = 1;

/* Encode the request header. */
switch (message->type) {
case RAFT_REQUEST_VOTE:
Expand All @@ -264,37 +285,35 @@ int uvEncodeMessage(const struct raft_message *message,
break;
case RAFT_APPEND_ENTRIES:
encodeAppendEntries(&message->append_entries, cursor);
/* For AppendEntries request we also send the entries payload. */
*n_bufs += message->append_entries.n_entries;
break;
case RAFT_APPEND_ENTRIES_RESULT:
encodeAppendEntriesResult(&message->append_entries_result, cursor);
break;
case RAFT_INSTALL_SNAPSHOT:
encodeInstallSnapshot(&message->install_snapshot, cursor);
/* For InstallSnapshot request we also send the snapshot payload. */
*n_bufs += 1;
break;
case RAFT_TIMEOUT_NOW:
encodeTimeoutNow(&message->timeout_now, cursor);
break;
case RAFT_REQUEST_CUSTOM:
encodeRequestCustom(&message->request_custom, cursor);
/* For RequestCustom we also send the custom data */
*n_bufs += 1;
break;
};

*n_bufs = 1;

/* For AppendEntries request we also send the entries payload. */
if (message->type == RAFT_APPEND_ENTRIES) {
*n_bufs += message->append_entries.n_entries;
}

/* For InstallSnapshot request we also send the snapshot payload. */
if (message->type == RAFT_INSTALL_SNAPSHOT) {
*n_bufs += 1;
}

*bufs = raft_calloc(*n_bufs, sizeof **bufs);
if (*bufs == NULL) {
goto oom_after_header_alloc;
}

(*bufs)[0] = header;


if (message->type == RAFT_APPEND_ENTRIES) {
unsigned i;
for (i = 0; i < message->append_entries.n_entries; i++) {
Expand All @@ -310,6 +329,11 @@ int uvEncodeMessage(const struct raft_message *message,
(*bufs)[1].len = message->install_snapshot.data.len;
}

if (message->type == RAFT_REQUEST_CUSTOM) {
(*bufs)[1].base = message->request_custom.data.base;
(*bufs)[1].len = message->request_custom.data.len;
}

return 0;

oom_after_header_alloc:
Expand Down Expand Up @@ -569,6 +593,24 @@ static void decodeTimeoutNow(const uv_buf_t *buf, struct raft_timeout_now *p)
p->last_log_term = byteGet64(&cursor);
}

static int decodeRequestCustom(unsigned char version,
const uv_buf_t *buf,
struct raft_request_custom* rp)
{
const uint8_t *cursor;

assert(buf != NULL);
assert(rp != NULL);

cursor = (void *)buf->base;

rp->version = version;
rp->term = byteGet64(&cursor);
rp->data.len = (size_t)byteGet64(&cursor);

return 0;
}

int uvDecodeMessage(uint8_t type,
uint8_t version,
const uv_buf_t *header,
Expand Down Expand Up @@ -610,6 +652,10 @@ int uvDecodeMessage(uint8_t type,
case RAFT_TIMEOUT_NOW:
decodeTimeoutNow(header, &message->timeout_now);
break;
case RAFT_REQUEST_CUSTOM:
decodeRequestCustom(version, header, &message->request_custom);
*payload_len += message->request_custom.data.len;
break;
default:
rv = RAFT_IOERR;
break;
Expand Down
3 changes: 3 additions & 0 deletions src/uv_recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,9 @@ static void uvServerReadCb(uv_stream_t *stream,
case RAFT_INSTALL_SNAPSHOT:
s->message.install_snapshot.data.base = s->payload.base;
break;
case RAFT_REQUEST_CUSTOM:
s->message.request_custom.data.base = s->payload.base;
break;
default:
/* We should never have read a payload in the first place */
assert(0);
Expand Down

0 comments on commit e4a70db

Please sign in to comment.