Skip to content

Commit

Permalink
Support rollback in sisl::StreamTracker (#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd authored Dec 18, 2024
1 parent 6ebd12d commit 3fa45bf
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 2 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class SISLConan(ConanFile):
name = "sisl"
version = "8.8.1"
version = "8.9.0"
homepage = "https://github.com/eBay/sisl"
description = "Library for fast data structures, utilities"
topics = ("ebay", "components", "core", "efficiency")
Expand Down
14 changes: 13 additions & 1 deletion include/sisl/fds/stream_tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class StreamTracker {
template < class... Args >
int64_t create(int64_t idx, Args&&... args) {
return do_update(
idx, [](T& data) { return false; }, true /* replace */, std::forward< Args >(args)...);
idx, [](T& ) { return false; }, true /* replace */, std::forward< Args >(args)...);
}

template < class... Args >
Expand All @@ -88,6 +88,18 @@ class StreamTracker {
m_comp_slot_bits.set_bits(start_bit, end_idx - start_idx + 1);
}

void rollback(int64_t new_end_idx) {
folly::SharedMutexWritePriority::ReadHolder holder(m_lock);
if ((new_end_idx < m_slot_ref_idx) ||
(new_end_idx >= (m_slot_ref_idx + int64_cast(m_active_slot_bits.size())))) {
throw std::out_of_range("Slot idx is not in range");
}

auto new_end_bit = new_end_idx - m_slot_ref_idx;
m_active_slot_bits.reset_bits(new_end_bit + 1, m_active_slot_bits.size() - new_end_bit - 1);
m_comp_slot_bits.reset_bits(new_end_bit + 1, m_comp_slot_bits.size() - new_end_bit - 1);
}

T& at(int64_t idx) const {
folly::SharedMutexWritePriority::ReadHolder holder(m_lock);
if (idx < m_slot_ref_idx) { throw std::out_of_range("Slot idx is not in range"); }
Expand Down
55 changes: 55 additions & 0 deletions src/fds/tests/test_stream_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ namespace {
struct TestData {
TestData(int val) : m_value(val) {}
int m_value = 0;

bool operator==(const TestData& other) const { return (m_value == other.m_value); }
};

struct StreamTrackerTest : public testing::Test {
Expand Down Expand Up @@ -111,6 +113,59 @@ TEST_F(StreamTrackerTest, ForceRealloc) {
EXPECT_EQ(get_mem_size(), prev_size * 2);
}

TEST_F(StreamTrackerTest, Rollback) {
static std::random_device s_rd{};
static std::default_random_engine s_engine{s_rd()};
std::uniform_int_distribution< int > gen{0, 999};

for (auto i = 0; i < 200; ++i) {
m_tracker.create(i, gen(s_engine));
}
EXPECT_EQ(m_tracker.active_upto(), 199);
EXPECT_EQ(m_tracker.completed_upto(), -1);
m_tracker.complete(0, 99);
EXPECT_EQ(m_tracker.active_upto(), 199);
EXPECT_EQ(m_tracker.completed_upto(), 99);

m_tracker.rollback(169);
EXPECT_EQ(m_tracker.active_upto(), 169);
EXPECT_EQ(m_tracker.completed_upto(), 99);

m_tracker.complete(100, 169);
EXPECT_EQ(m_tracker.active_upto(), 169);
EXPECT_EQ(m_tracker.completed_upto(), 169);

auto new_val1 = gen(s_engine);
auto new_val2 = gen(s_engine);
m_tracker.create(170, new_val1);
m_tracker.create(172, new_val2);
EXPECT_EQ(m_tracker.active_upto(), 170);
EXPECT_EQ(m_tracker.completed_upto(), 169);
m_tracker.complete(170, 170);
EXPECT_EQ(m_tracker.completed_upto(), 170);
m_tracker.create_and_complete(171, new_val2);
m_tracker.complete(172, 172);

EXPECT_EQ(m_tracker.completed_upto(), 172);
EXPECT_EQ(m_tracker.at(170), TestData{new_val1});
EXPECT_EQ(m_tracker.at(171), TestData{new_val2});
EXPECT_EQ(m_tracker.at(172), TestData{new_val2});

bool exception_hit{false};
m_tracker.truncate(80);
try {
m_tracker.rollback(1);
} catch (const std::out_of_range& e) { exception_hit = true; }
EXPECT_EQ(exception_hit, true);

exception_hit = false;
m_tracker.truncate(173);
try {
m_tracker.rollback(1);
} catch (const std::out_of_range& e) { exception_hit = true; }
EXPECT_EQ(exception_hit, true);
}

int main(int argc, char* argv[]) {
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
Expand Down

0 comments on commit 3fa45bf

Please sign in to comment.