Skip to content

Commit

Permalink
Simplified scalar data recv
Browse files Browse the repository at this point in the history
  • Loading branch information
amdrozdov committed Apr 21, 2024
1 parent 380478f commit 9b1ace0
Showing 1 changed file with 18 additions and 23 deletions.
41 changes: 18 additions & 23 deletions examples/async_replication/async_replicated_container.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,52 +169,47 @@ class AsyncReplicatedContainer {

Relay recv_relay(const std::unique_ptr<current::net::Connection>& c) {
// Get buffer length
std::vector<uint8_t> len_buf(sizeof(size_t));
auto size = c->BlockingRead(&len_buf[0], sizeof(size_t));
size_t buffer_len;
auto size = c->BlockingRead(reinterpret_cast<uint8_t*>(&buffer_len), sizeof(size_t));
if (!size) throw;
size_t* buffer_len = reinterpret_cast<size_t*>(len_buf.data());
*buffer_len = be64toh(*buffer_len);
buffer_len = be64toh(buffer_len);

// Get the key
std::vector<uint8_t> key_buf(*buffer_len);
size = c->BlockingRead(&key_buf[0], *buffer_len);
std::vector<uint8_t> key_buf(buffer_len);
size = c->BlockingRead(&key_buf[0], buffer_len);
if (!size) throw;
std::string key(key_buf.begin(), key_buf.end());

// Get the value
std::vector<uint8_t> val_buf(sizeof(uint32_t));
size = c->BlockingRead(&val_buf[0], sizeof(uint32_t));
uint32_t value;
size = c->BlockingRead(reinterpret_cast<uint8_t*>(&value), sizeof(value));
if (!size) throw;
uint32_t* value = reinterpret_cast<uint32_t*>(val_buf.data());
*value = ntohl(*value);
value = ntohl(value);

// Get replica_id length
std::vector<uint8_t> repl_buf(sizeof(size_t));
size = c->BlockingRead(&repl_buf[0], sizeof(size_t));
size_t repl_len;
size = c->BlockingRead(reinterpret_cast<uint8_t*>(&repl_len), sizeof(repl_len));
if (!size) throw;
size_t* repl_len = reinterpret_cast<size_t*>(repl_buf.data());
*repl_len = be64toh(*repl_len);
repl_len = be64toh(repl_len);

// Get the replica id
std::vector<uint8_t> repl_id_buf(*repl_len);
size = c->BlockingRead(&repl_id_buf[0], *repl_len);
std::vector<uint8_t> repl_id_buf(repl_len);
size = c->BlockingRead(&repl_id_buf[0], repl_len);
if (!size) throw;
std::string replica_id(repl_id_buf.begin(), repl_id_buf.end());

// Get the clock
std::vector<uint8_t> clock_buf(sizeof(uint64_t));
size = c->BlockingRead(&clock_buf[0], sizeof(uint64_t));
uint64_t clock_int;
size = c->BlockingRead(reinterpret_cast<uint8_t*>(&clock_int), sizeof(clock_int));
if (!size) throw;
uint64_t* clock_int = reinterpret_cast<uint64_t*>(clock_buf.data());
*clock_int = be64toh(*clock_int);
std::chrono::microseconds clock(*clock_int);
clock_int = be64toh(clock_int);

// Prepare the relay object
Relay result;
result.key = key;
result.value = *value;
result.value = value;
result.replica_id = replica_id;
result.clock = clock;
result.clock = std::chrono::microseconds(clock_int);

return result;
}
Expand Down

0 comments on commit 9b1ace0

Please sign in to comment.