From 48b92b912eeb43a8ee5d96e52e5b968a0ef6a6ea Mon Sep 17 00:00:00 2001 From: Andrei Drozdov Date: Thu, 11 Jul 2024 01:00:24 +0200 Subject: [PATCH] MergeStrategy implementation for vector clock --- bricks/distributed/test.cc | 23 ++---- bricks/distributed/vector_clock.h | 110 ++++++++++++++++------------- examples/async_replication/node.cc | 2 +- 3 files changed, 64 insertions(+), 71 deletions(-) diff --git a/bricks/distributed/test.cc b/bricks/distributed/test.cc index d510b359..bb3e2b99 100644 --- a/bricks/distributed/test.cc +++ b/bricks/distributed/test.cc @@ -84,25 +84,10 @@ TEST(VectorClock, Merge) { EXPECT_EQ(v.state()[1], cur_state[1]); } -TEST(VectorClock, CustomValidator) { - auto base_time = current::time::Now(); - Clocks c1 = {1, 2}; - auto v = VectorClock(c1, 0); - - Clocks c2 = {0, 1}; - // Check custom validation lambda - merge all events - EXPECT_EQ(true, v.merge(c2, [](Clocks&, Clocks&) { return false; })); - auto cur_state = v.state(); - // local time should be updated after merge - EXPECT_GT(cur_state[0], c1[0]); - // merged time should be equal to max(t[1], t'[1]) = base + 2 - EXPECT_EQ(c2[1] + 1, cur_state[1]); -} - TEST(VectorClock, StrictMerge) { auto base_time = current::time::Now(); Clocks c1 = {1, 2}; - auto v = StrictVectorClock(c1, 0); + auto v = VectorClock(c1, 0); // Merge correct update Clocks c2 = {2, 3}; @@ -116,7 +101,7 @@ TEST(VectorClock, StrictMerge) { // Merge equals using strict validation c1 = {10, 20}; - v = StrictVectorClock(c1, 0); + v = VectorClock(c1, 0); cur_state = v.state(); c2 = {10, 20}; EXPECT_EQ(false, v.merge(c2)); @@ -124,7 +109,7 @@ TEST(VectorClock, StrictMerge) { EXPECT_EQ(v.state()[1], cur_state[1]); // Merge partially equals - v = StrictVectorClock(c1, 0); + v = VectorClock(c1, 0); cur_state = v.state(); c2 = {1, 20}; // 0 is equeal, 1 is greater - not ok to merge @@ -133,7 +118,7 @@ TEST(VectorClock, StrictMerge) { EXPECT_EQ(v.state()[1], cur_state[1]); // Merge incorrect - v = StrictVectorClock(c1, 0); + v = VectorClock(c1, 0); cur_state = v.state(); c2 = {0, 1}; EXPECT_EQ(false, v.merge(c2)); diff --git a/bricks/distributed/vector_clock.h b/bricks/distributed/vector_clock.h index 525c3805..aa3b20ee 100644 --- a/bricks/distributed/vector_clock.h +++ b/bricks/distributed/vector_clock.h @@ -4,59 +4,21 @@ typedef std::vector Clocks; -class VectorClock { - protected: - Clocks clock; - uint32_t local_id; - +class MergeStrategy { public: - explicit VectorClock(uint32_t size, uint32_t node_id) { - // Set local process id and cluster size - local_id = node_id; - clock.resize(size, 0); - } - - explicit VectorClock(Clocks &v, uint32_t node_id) { - // Constructor for existing clock, used for inserting new data - local_id = node_id; - clock = Clocks(v.begin(), v.end()); + std::pair merge(Clocks &v1, Clocks &v2, std::function validator) { + // Basic merge strategy: + // 1. always merge (even in case of conflict) + // 2. default conflict function check that v1 <= v2 + return std::pair{!validator(v1, v2), true}; } - - explicit VectorClock() { - // Lamport clocks for size=1 - local_id = 0; - clock.push_back(0); - } - - void step() { - // T[i] = T[i] + 1 for logical step - clock[local_id]++; - } - - Clocks &state() { - // Returns current state for network transmission - return clock; - } - + std::pair merge(Clocks &v1, Clocks &v2) { return merge(v1, v2, is_conflicting); } static bool is_conflicting(Clocks &v1, Clocks &v2) { - // default implementation + // basic implementation // returns true if there are no conflicts for merging return !is_lte(v1, v2); } - bool merge(Clocks &to_compare, std::function validator) { - // Do we need to generalize validation, or it's ok to always merge vectors? - // It's valid for replication example, but maybe in some cases we should not merge - bool is_data_conflicted = validator(clock, to_compare); - for (size_t i = 0; i < clock.size(); i++) { - clock[i] = std::max(clock[i], to_compare[i]); - } - step(); - return !is_data_conflicted; - } - - bool merge(Clocks &to_compare) { return merge(to_compare, is_conflicting); } - static bool is_same(Clocks &v1, Clocks &v2) { // Happens on exactly same moment // T=T' if T[i] = T'[i] for any i @@ -92,13 +54,59 @@ class VectorClock { } }; -class StrictVectorClock : public VectorClock { - using VectorClock::VectorClock; - +class StrictMergeStrategy : public MergeStrategy { public: static bool is_conflicting(Clocks &v1, Clocks &v2) { // Check if v1 is in sync with v2 and v1 is strictly early then v2 return !(!is_parallel(v1, v2) && is_early(v1, v2)); } - bool merge(Clocks &to_compare) { return VectorClock::merge(to_compare, is_conflicting); } -}; \ No newline at end of file + std::pair merge(Clocks &v1, Clocks &v2) { return MergeStrategy::merge(v1, v2, is_conflicting); } +}; + +template +class VectorClock { + protected: + Clocks clock; + STRATEGY strategy; + uint32_t local_id; + + public: + explicit VectorClock(uint32_t size, uint32_t node_id) { + // Set local process id and cluster size + local_id = node_id; + clock.resize(size, 0); + } + + explicit VectorClock(Clocks &v, uint32_t node_id) { + // Constructor for existing clock, used for inserting new data + local_id = node_id; + clock = Clocks(v.begin(), v.end()); + } + + explicit VectorClock() { + // Lamport clocks for size=1 + local_id = 0; + clock.push_back(0); + } + + void step() { + // T[i] = T[i] + 1 for logical step + clock[local_id]++; + } + + Clocks &state() { + // Returns current state for network transmission + return clock; + } + + bool merge(Clocks &to_compare) { + auto merge_results = strategy.merge(clock, to_compare); + if (merge_results.second) { + for (size_t i = 0; i < clock.size(); i++) { + clock[i] = std::max(clock[i], to_compare[i]); + } + step(); + } + return merge_results.first; + } +}; diff --git a/examples/async_replication/node.cc b/examples/async_replication/node.cc index 239b6745..4324d2d3 100644 --- a/examples/async_replication/node.cc +++ b/examples/async_replication/node.cc @@ -28,7 +28,7 @@ int main(int argc, char** argv) { false, 10}; - AsyncReplicatedContainer storage(conf); + AsyncReplicatedContainer, Uint32Value> storage(conf); storage.start(); storage.start_monitor(keys, FLAGS_monitor_delay); while (true) {