Skip to content

Commit

Permalink
MergeStrategy implementation for vector clock
Browse files Browse the repository at this point in the history
  • Loading branch information
amdrozdov committed Jul 10, 2024
1 parent f8d091e commit 48b92b9
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 71 deletions.
23 changes: 4 additions & 19 deletions bricks/distributed/test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,25 +84,10 @@ TEST(VectorClock, Merge) {
EXPECT_EQ(v.state()[1], cur_state[1]);
}

TEST(VectorClock, CustomValidator) {
auto base_time = current::time::Now();
Clocks c1 = {1, 2};
auto v = VectorClock(c1, 0);

Clocks c2 = {0, 1};
// Check custom validation lambda - merge all events
EXPECT_EQ(true, v.merge(c2, [](Clocks&, Clocks&) { return false; }));
auto cur_state = v.state();
// local time should be updated after merge
EXPECT_GT(cur_state[0], c1[0]);
// merged time should be equal to max(t[1], t'[1]) = base + 2
EXPECT_EQ(c2[1] + 1, cur_state[1]);
}

TEST(VectorClock, StrictMerge) {
auto base_time = current::time::Now();
Clocks c1 = {1, 2};
auto v = StrictVectorClock(c1, 0);
auto v = VectorClock<StrictMergeStrategy>(c1, 0);

// Merge correct update
Clocks c2 = {2, 3};
Expand All @@ -116,15 +101,15 @@ TEST(VectorClock, StrictMerge) {

// Merge equals using strict validation
c1 = {10, 20};
v = StrictVectorClock(c1, 0);
v = VectorClock<StrictMergeStrategy>(c1, 0);
cur_state = v.state();
c2 = {10, 20};
EXPECT_EQ(false, v.merge(c2));
EXPECT_EQ(v.state()[0], cur_state[0] + 1);
EXPECT_EQ(v.state()[1], cur_state[1]);

// Merge partially equals
v = StrictVectorClock(c1, 0);
v = VectorClock<StrictMergeStrategy>(c1, 0);
cur_state = v.state();
c2 = {1, 20};
// 0 is equeal, 1 is greater - not ok to merge
Expand All @@ -133,7 +118,7 @@ TEST(VectorClock, StrictMerge) {
EXPECT_EQ(v.state()[1], cur_state[1]);

// Merge incorrect
v = StrictVectorClock(c1, 0);
v = VectorClock<StrictMergeStrategy>(c1, 0);
cur_state = v.state();
c2 = {0, 1};
EXPECT_EQ(false, v.merge(c2));
Expand Down
110 changes: 59 additions & 51 deletions bricks/distributed/vector_clock.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,21 @@

typedef std::vector<uint64_t> Clocks;

class VectorClock {
protected:
Clocks clock;
uint32_t local_id;

class MergeStrategy {
public:
explicit VectorClock(uint32_t size, uint32_t node_id) {
// Set local process id and cluster size
local_id = node_id;
clock.resize(size, 0);
}

explicit VectorClock(Clocks &v, uint32_t node_id) {
// Constructor for existing clock, used for inserting new data
local_id = node_id;
clock = Clocks(v.begin(), v.end());
std::pair<bool, bool> merge(Clocks &v1, Clocks &v2, std::function<bool(Clocks &v1, Clocks &v2)> validator) {
// Basic merge strategy:
// 1. always merge (even in case of conflict)
// 2. default conflict function check that v1 <= v2
return std::pair<bool, bool>{!validator(v1, v2), true};
}

explicit VectorClock() {
// Lamport clocks for size=1
local_id = 0;
clock.push_back(0);
}

void step() {
// T[i] = T[i] + 1 for logical step
clock[local_id]++;
}

Clocks &state() {
// Returns current state for network transmission
return clock;
}

std::pair<bool, bool> merge(Clocks &v1, Clocks &v2) { return merge(v1, v2, is_conflicting); }
static bool is_conflicting(Clocks &v1, Clocks &v2) {
// default implementation
// basic implementation
// returns true if there are no conflicts for merging
return !is_lte(v1, v2);
}

bool merge(Clocks &to_compare, std::function<bool(Clocks &v1, Clocks &v2)> validator) {
// Do we need to generalize validation, or it's ok to always merge vectors?
// It's valid for replication example, but maybe in some cases we should not merge
bool is_data_conflicted = validator(clock, to_compare);
for (size_t i = 0; i < clock.size(); i++) {
clock[i] = std::max(clock[i], to_compare[i]);
}
step();
return !is_data_conflicted;
}

bool merge(Clocks &to_compare) { return merge(to_compare, is_conflicting); }

static bool is_same(Clocks &v1, Clocks &v2) {
// Happens on exactly same moment
// T=T' if T[i] = T'[i] for any i
Expand Down Expand Up @@ -92,13 +54,59 @@ class VectorClock {
}
};

class StrictVectorClock : public VectorClock {
using VectorClock::VectorClock;

class StrictMergeStrategy : public MergeStrategy {
public:
static bool is_conflicting(Clocks &v1, Clocks &v2) {
// Check if v1 is in sync with v2 and v1 is strictly early then v2
return !(!is_parallel(v1, v2) && is_early(v1, v2));
}
bool merge(Clocks &to_compare) { return VectorClock::merge(to_compare, is_conflicting); }
};
std::pair<bool, bool> merge(Clocks &v1, Clocks &v2) { return MergeStrategy::merge(v1, v2, is_conflicting); }
};

template <class STRATEGY = MergeStrategy>
class VectorClock {
protected:
Clocks clock;
STRATEGY strategy;
uint32_t local_id;

public:
explicit VectorClock(uint32_t size, uint32_t node_id) {
// Set local process id and cluster size
local_id = node_id;
clock.resize(size, 0);
}

explicit VectorClock(Clocks &v, uint32_t node_id) {
// Constructor for existing clock, used for inserting new data
local_id = node_id;
clock = Clocks(v.begin(), v.end());
}

explicit VectorClock() {
// Lamport clocks for size=1
local_id = 0;
clock.push_back(0);
}

void step() {
// T[i] = T[i] + 1 for logical step
clock[local_id]++;
}

Clocks &state() {
// Returns current state for network transmission
return clock;
}

bool merge(Clocks &to_compare) {
auto merge_results = strategy.merge(clock, to_compare);
if (merge_results.second) {
for (size_t i = 0; i < clock.size(); i++) {
clock[i] = std::max(clock[i], to_compare[i]);
}
step();
}
return merge_results.first;
}
};
2 changes: 1 addition & 1 deletion examples/async_replication/node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ int main(int argc, char** argv) {
false,
10};

AsyncReplicatedContainer<StrictVectorClock, Uint32Value> storage(conf);
AsyncReplicatedContainer<VectorClock<StrictMergeStrategy>, Uint32Value> storage(conf);
storage.start();
storage.start_monitor(keys, FLAGS_monitor_delay);
while (true) {
Expand Down

0 comments on commit 48b92b9

Please sign in to comment.