diff --git a/examples/async_replication/async_replicated_container.h b/examples/async_replication/async_replicated_container.h index afc2af0e..580f2d82 100644 --- a/examples/async_replication/async_replicated_container.h +++ b/examples/async_replication/async_replicated_container.h @@ -4,6 +4,8 @@ #include "../../blocks/http/api.h" #include "../../bricks/time/chrono.h" +#include "vector_clock.h" + #include #include @@ -11,7 +13,7 @@ CURRENT_STRUCT(Relay) { CURRENT_FIELD(key, std::string); CURRENT_FIELD(value, uint32_t); CURRENT_FIELD(replica_id, std::string); - CURRENT_FIELD(clock, std::chrono::microseconds); + CURRENT_FIELD(clock, Clocks); }; struct ReplicationNode final { @@ -29,16 +31,17 @@ struct ReplicationConfig final { uint32_t max_waits; }; -struct SharedState final { - bool die = false; - std::map data; - std::map clock; - std::map > > replication_out; -}; - class AsyncReplicatedContainer { private: + struct SharedState final { + bool die = false; + std::map data; + std::map clock; + std::map > > replication_out; + }; + std::string sid; + uint32_t clock_id; uint16_t reader_port; bool is_ready = false; std::vector nodes; @@ -69,7 +72,10 @@ class AsyncReplicatedContainer { // Set clock and send update to replication thread if (replicate) { - state.clock[tuple.first] = current::time::Now(); + if (state.clock.find(tuple.first) == state.clock.end()) { + state.clock[tuple.first] = VectorClock(nodes.size(), clock_id); + } + state.clock[tuple.first].step(); // Send relay to each node for (auto& node : nodes) { @@ -92,6 +98,13 @@ class AsyncReplicatedContainer { is_verbose = config.is_verbose; show_network_err = config.show_network_errors; max_waits = config.max_waits; + + for (size_t i = 0; i < nodes.size(); i++) { + if (nodes[i].host == config.host && nodes[i].port == config.port) { + clock_id = i; + break; + } + } } ~AsyncReplicatedContainer() { stop(); } @@ -107,8 +120,7 @@ class AsyncReplicatedContainer { current::net::Connection connection(socket.Accept()); if (is_verbose) { - state.MutableUse( - [port](SharedState& state) { std::cout << "Reader connected on port " << port << std::endl; }); + state.MutableUse([port](SharedState&) { std::cout << "Reader connected on port " << port << std::endl; }); } readers.push_back(std::thread([this, conn = std::make_unique(std::move(connection))] { @@ -116,7 +128,7 @@ class AsyncReplicatedContainer { })); } catch (const current::Exception& e) { if (show_network_err) { - state.MutableUse([&e](SharedState& state) { + state.MutableUse([&e](SharedState&) { std::cout << "error reader" << ": " << e.OriginalDescription() << std::endl; }); @@ -130,7 +142,7 @@ class AsyncReplicatedContainer { } void replication_reader(const std::unique_ptr& connection) { - int waits = 0; + uint32_t waits = 0; Relay buffer; while (waits < max_waits) { @@ -141,15 +153,15 @@ class AsyncReplicatedContainer { // Get the data size buffer = recv_relay(connection); state.MutableUse([&buffer, this](SharedState& state) { - std::pair data(buffer.key, buffer.value); bool is_insert = state.data.find(buffer.key) == state.data.end(); - bool is_valid_update = (!is_insert) && (buffer.clock > state.clock[buffer.key]); - - // new row - if (is_insert || is_valid_update) { - state.clock[buffer.key] = buffer.clock; + if (is_insert) { + state.clock[buffer.key] = VectorClock(nodes.size(), clock_id); + } + bool is_valid_update = state.clock[buffer.key].merge(buffer.clock, is_insert); + if (is_valid_update) { state.data[buffer.key] = buffer.value; } + if (is_verbose) { if (is_insert) { std::cout << "NEW [" << buffer.replica_id << "] key " << buffer.key << std::endl; @@ -198,18 +210,22 @@ class AsyncReplicatedContainer { if (!size) throw; std::string replica_id(repl_id_buf.begin(), repl_id_buf.end()); - // Get the clock - uint64_t clock_int; - size = c->BlockingRead(reinterpret_cast(&clock_int), sizeof(clock_int)); - if (!size) throw; - clock_int = be64toh(clock_int); + // Get the clock vector + Clocks clock(nodes.size()); + for (size_t i = 0; i < nodes.size(); i++) { + uint64_t clock_int; + size = c->BlockingRead(reinterpret_cast(&clock_int), sizeof(clock_int)); + if (!size) throw; + clock_int = be64toh(clock_int); + clock[i] = std::chrono::microseconds(clock_int); + } // Prepare the relay object Relay result; result.key = key; result.value = value; result.replica_id = replica_id; - result.clock = std::chrono::microseconds(clock_int); + result.clock = clock; return result; } @@ -233,16 +249,18 @@ class AsyncReplicatedContainer { // Send the replica id buffer c.BlockingWrite(r.replica_id.c_str(), r.replica_id.length(), true); - // Send clock for given key/value - uint64_t time_data = htobe64(r.clock.count()); - c.BlockingWrite(&time_data, sizeof(time_data), true); + // Send vector clock for given key/value + for (size_t i = 0; i < nodes.size(); i++) { + uint64_t time_data = htobe64(r.clock[i].count()); + c.BlockingWrite(&time_data, sizeof(time_data), true); + } } void writer(std::string host, uint16_t port) { struct MsgOrDie { bool die; std::pair data; - std::chrono::microseconds clock; + Clocks clock; }; bool is_stop = false; Relay buffer; @@ -257,8 +275,7 @@ class AsyncReplicatedContainer { current::net::Connection conn(current::net::ClientSocket(host, port)); if (is_verbose) { - state.MutableUse( - [port](SharedState& state) { std::cout << "Writer connected on port " << port << std::endl; }); + state.MutableUse([port](SharedState&) { std::cout << "Writer connected on port " << port << std::endl; }); } while (true) { auto data_or_die = state.WaitFor( @@ -267,12 +284,13 @@ class AsyncReplicatedContainer { }, [this, &queue_id](SharedState& state) { if (state.die) { - return MsgOrDie{state.die, std::pair{}, current::time::Now()}; + return MsgOrDie{state.die, std::pair{}, Clocks()}; } auto data = state.replication_out[queue_id].front(); state.replication_out[queue_id].pop(); - auto clock = state.clock[data.first]; + state.clock[data.first].step(); + auto clock = state.clock[data.first].state(); MsgOrDie result = MsgOrDie{false, std::move(data), std::move(clock)}; return result; }, @@ -293,7 +311,7 @@ class AsyncReplicatedContainer { } } catch (const current::Exception& e) { if (show_network_err) { - state.MutableUse([&e](SharedState& state) { + state.MutableUse([&e](SharedState&) { std::cout << "error writer" << ": " << e.OriginalDescription() << std::endl; }); @@ -315,7 +333,7 @@ class AsyncReplicatedContainer { }); writers.push_back(std::thread([this, &node] { writer(node.host, node.port); })); if (is_verbose) { - std::cout << "Replicated with node " << nid << std::endl; + std::cout << "Replicated with node " << nid << " with clock_id " << clock_id << std::endl; } } reader = std::thread([this] { connection_handler(reader_port); }); @@ -363,7 +381,7 @@ class AsyncReplicatedContainer { } auto res = state.MutableUse([&key, this](SharedState& state) { auto val = state.data.at(key); - auto clock = state.clock[key]; + auto clock = state.clock[key].state(); Relay res; res.key = key; res.value = val; @@ -393,8 +411,9 @@ class AsyncReplicatedContainer { continue; } auto info = get_info(key); - stop = state.MutableUse([&info](SharedState& state) { - std::cout << "key= " << info.key << " val= " << info.value << " clock= " << info.clock.count() << std::endl; + stop = state.MutableUse([&info, this](SharedState& state) { + std::cout << "key= " << info.key << " val= " << info.value << " clock= " << info.clock[clock_id].count() + << std::endl; return state.die; }); } diff --git a/examples/async_replication/vector_clock.h b/examples/async_replication/vector_clock.h new file mode 100644 index 00000000..1e44d1a4 --- /dev/null +++ b/examples/async_replication/vector_clock.h @@ -0,0 +1,94 @@ +#pragma once + +#include "../../bricks/time/chrono.h" + +typedef std::vector Clocks; + +class VectorClock { + protected: + Clocks clock; + uint32_t local_id; + + public: + VectorClock(uint32_t size, uint32_t node_id) { + // Set local process id and cluster size + local_id = node_id; + clock.resize(size, current::time::Now()); + } + VectorClock() { + // Lamport clocks for size=1 + VectorClock(1, 0); + } + + void step() { + // T[i] = T[i] + 1 for logical step + clock[local_id] = current::time::Now(); + } + + Clocks &state() { + // Returns current state for network transmission + return clock; + } + + bool is_conflicting(Clocks &to_compare) { + // default implementation + // returns true if there are no conflicts for merging + return is_lte(clock, to_compare); + } + + // TODO add merge with lambda + + bool merge(Clocks &to_compare, bool force) { + // Merges vector clock if there is no conflicts + // force flag is used for inserts (from other nodes) + if (!force && is_conflicting(to_compare)) { + return false; + } + for (size_t i = 0; i < clock.size(); i++) { + clock[i] == max(clock[i], to_compare[i]); + } + step(); + return true; + } + + static bool is_same(Clocks &v1, Clocks &v2) { + // Happens on exactly same moment + // T=T' if T[i] = T'[i] for any i + for (size_t i = 0; i < v1.size(); i++) { + if (v1[i] != v2[i]) { + return false; + } + } + return true; + } + + static bool is_lte(Clocks &v1, Clocks &v2) { + // Happens early or at the same time + // T <= T' if T[i] <= T'[i] for any i + for (size_t i = 0; i < v1.size(); i++) { + if (v1[i] > v2[i]) { + return false; + } + } + return true; + } + + static bool is_early(Clocks &v1, Clocks &v2) { + // v1 happens before v2 + // T < T' if T <= T' and T != T' + return is_lte(v1, v2) && !is_same(v1, v2); + } + + static bool is_parallel(Clocks &v1, Clocks &v2) { + // v1 is not in sync with v2 + // T || T' if !(T <= T') and !(T' <= T) + return !is_lte(v1, v2) && !is_lte(v2, v1); + } +}; + +class StrictVectorClock : VectorClock { + bool is_conflicting(Clocks &to_compare) { + // Check if v1 is in sync with v2 and v1 is strictly early then v2 + return !is_parallel(clock, to_compare) && is_early(clock, to_compare); + } +}; \ No newline at end of file