Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fast forward API to truncate upto a log index #334

Merged
merged 6 commits into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "3.6.14"
version = "3.7.1"

homepage = "https://github.corp.ebay.com/SDS/homestore"
description = "HomeStore"
Expand Down
7 changes: 7 additions & 0 deletions src/homelogstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,13 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
}

void LogDevMetadata::persist() {
#ifdef _PRERELEASE
if (homestore_flip->test_flip("logstore_test_skip_persist")) {
LOGINFO("skipping persist of logdev metadata");
return;
}
#endif

if (m_meta_mgr_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_buf->bytes), m_raw_buf->size, m_meta_mgr_cookie);
} else {
Expand Down
57 changes: 53 additions & 4 deletions src/homelogstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void HomeLogStore::truncate(const logstore_seq_num_t upto_seq_num, const bool in
// First try to block the flushing of logdevice and if we are successfully able to do, then
auto shared_this{shared_from_this()};
const bool locked_now{m_logdev.try_lock_flush([shared_this, upto_seq_num, in_memory_truncate_only]() {
shared_this->do_truncate(upto_seq_num);
shared_this->do_truncate(upto_seq_num, false /*persist meta now*/);
if (!in_memory_truncate_only) {
[[maybe_unused]] const auto key{shared_this->get_family().do_device_truncate()};
}
Expand All @@ -237,13 +237,62 @@ void HomeLogStore::truncate(const logstore_seq_num_t upto_seq_num, const bool in
if (locked_now) { m_logdev.unlock_flush(); }
}

void HomeLogStore::sync_truncate(const logstore_seq_num_t upto_seq_num, const bool in_memory_truncate_only) {
// Check if we need to fill any gaps in the logstore
hkadayam marked this conversation as resolved.
Show resolved Hide resolved
auto const last_idx{get_contiguous_issued_seq_num(std::max(0l, truncated_upto()))};
HS_REL_ASSERT_GE(last_idx, 0l, "Negative sequence number: {} [Logstore id ={}]", last_idx, m_store_id);
auto const next_slot{last_idx + 1};
for (auto curr_idx = next_slot; upto_seq_num >= curr_idx; ++curr_idx) {
fill_gap(curr_idx);
}

#ifndef NDEBUG
const auto s{m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire)};
// Don't check this if we don't know our truncation boundary. The call is made to inform us about
// correct truncation point.
if (s != -1) {
HS_DBG_ASSERT_LE(upto_seq_num, get_contiguous_completed_seq_num(s),
"Logstore {} expects truncation to be contiguously completed", m_store_id);
}
#endif

struct Context {
std::mutex truncate_mutex;
std::condition_variable truncate_cv;
bool truncate_done{false};
};
auto ctx{std::make_shared< Context >()};

auto shared_this{shared_from_this()};
const bool locked_now{m_logdev.try_lock_flush([shared_this, upto_seq_num, in_memory_truncate_only, ctx]() {
shared_this->do_truncate(upto_seq_num, true /*persist meta now*/);
if (!in_memory_truncate_only) {
[[maybe_unused]] const auto key{shared_this->get_family().do_device_truncate()};
}
{
std::unique_lock< std::mutex > lk{ctx->truncate_mutex};
ctx->truncate_done = true;
}
ctx->truncate_cv.notify_one();
})};

if (locked_now) {
m_logdev.unlock_flush();
} else {
{
std::unique_lock< std::mutex > lk{ctx->truncate_mutex};
ctx->truncate_cv.wait(lk, [&ctx] { return ctx->truncate_done; });
}
}
}

// NOTE: This method assumes the flush lock is already acquired by the caller
void HomeLogStore::do_truncate(const logstore_seq_num_t upto_seq_num) {
void HomeLogStore::do_truncate(const logstore_seq_num_t upto_seq_num, const bool persist_now) {
m_records.truncate(upto_seq_num);
m_safe_truncation_boundary.seq_num.store(upto_seq_num, std::memory_order_release);

// Need to update the superblock with meta, we don't persist yet, will be done as part of log dev truncation
m_logdev.update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */);
// Need to update the superblock with meta, in case we don't persist yet, will be done as part of log dev truncation
m_logdev.update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, persist_now /* persist_now */);

const int ind{search_max_le(upto_seq_num)};
if (ind < 0) {
Expand Down
14 changes: 11 additions & 3 deletions src/homelogstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct log_dump_req {
std::shared_ptr< HomeLogStore > log_store; // if null all log stores are dumped
logstore_seq_num_t start_seq_num; // empty_key if from start of log file
logstore_seq_num_t end_seq_num; // empty_key if till last log entry
logstore_seq_num_t batch_size = 0; // Size of the output batch.
logstore_seq_num_t batch_size = 0; // Size of the output batch.
};

struct logstore_record {
Expand Down Expand Up @@ -94,7 +94,7 @@ struct logstore_req {
if (req->is_internal_req) { sisl::ObjectAllocator< logstore_req >::deallocate(req); }
}

friend class sisl::ObjectAllocator<logstore_req>;
friend class sisl::ObjectAllocator< logstore_req >;

private:
logstore_req() = default;
Expand Down Expand Up @@ -396,6 +396,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
void truncate(const logstore_seq_num_t upto_seq_num, const bool in_memory_truncate_only = true);

/**
* @brief Truncate the logs for this log store upto the seq_num provided (inclusive) and persist the metablock
* before returning. If the next available slot is less than upto_seq_num, it will fill the gaps.
* This API makes sure the log idx is durable after the truncation.
* See above truncate method for more details on parameters
*/
void sync_truncate(const logstore_seq_num_t upto_seq_num, const bool in_memory_truncate_only = true);

/**
* @brief Fill the gap in the seq_num with a dummy value. This ensures that get_contiguous_issued and completed
* seq_num methods move forward. The filled data is not readable and any attempt to read this seq_num will
Expand Down Expand Up @@ -513,7 +521,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
void on_log_found(const logstore_seq_num_t seq_num, const logdev_key ld_key, const logdev_key flush_ld_key,
const log_buffer buf);
void on_batch_completion(const logdev_key& flush_batch_ld_key);
void do_truncate(const logstore_seq_num_t upto_seq_num);
void do_truncate(const logstore_seq_num_t upto_seq_num, const bool persist_now);
[[nodiscard]] int search_max_le(const logstore_seq_num_t input_sn);

private:
Expand Down
104 changes: 103 additions & 1 deletion src/homelogstore/tests/test_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "api/vol_interface.hpp"
#include "test_common/homestore_test_common.hpp"
#include "engine/common/homestore_flip.hpp"

#include "../log_store.hpp"

Expand Down Expand Up @@ -114,6 +115,9 @@ class SampleLogStoreClient {
m_log_store->register_log_found_cb(bind_this(SampleLogStoreClient::on_log_found, 3));
}

auto get_log_store() { return m_log_store; }
static void set_validate_on_log_found(const bool flag) { validate_on_log_found = flag; }

void reset_recovery() {
m_n_recovered_lsns = 0;
m_n_recovered_truncated_lsns = 0;
Expand Down Expand Up @@ -305,6 +309,7 @@ class SampleLogStoreClient {
}

void on_log_found(const logstore_seq_num_t lsn, const log_buffer buf, void* const ctx) {
if (!validate_on_log_found) { return; }
LOGDEBUG("Recovered lsn {}:{} with log data of size {}", m_log_store->get_store_id(), lsn, buf.size())
EXPECT_LE(lsn, m_cur_lsn.load()) << "Recovered incorrect lsn " << m_log_store->get_store_id() << ":" << lsn
<< "Expected less than cur_lsn " << m_cur_lsn.load();
Expand Down Expand Up @@ -370,6 +375,7 @@ class SampleLogStoreClient {
private:
static constexpr uint32_t max_data_size = 1024;
static uint64_t s_max_flush_multiple;
inline static bool validate_on_log_found = true;

logstore_id_t m_store_id;
test_log_store_comp_cb_t m_comp_cb;
Expand Down Expand Up @@ -482,7 +488,7 @@ class SampleDB {

if (SISL_OPTIONS.count("http_port")) {
test_common::set_fixed_http_port(SISL_OPTIONS["http_port"].as< uint32_t >());
}else {
} else {
test_common::set_random_http_port();
}
VolInterface::init(params, restart);
Expand Down Expand Up @@ -554,6 +560,8 @@ class SampleDB {
return m_highest_log_idx[fid].load();
}

std::vector< std::unique_ptr< SampleLogStoreClient > >& log_store_clients() { return m_log_store_clients; }

private:
const static std::string s_fpath_root;
std::vector< std::string > m_dev_names;
Expand Down Expand Up @@ -897,6 +905,100 @@ class LogStoreTest : public ::testing::Test {
uint32_t m_holes_per_batch{0};
};

static uint64_t curr_trunc_start(std::shared_ptr< HomeLogStore >& _log_store) {
return std::max(0l, _log_store->truncated_upto());
}

TEST_F(LogStoreTest, TruncateDurability) {
auto _hs_log_store =
SampleDB::instance()
.log_store_clients()
.emplace_back(std::make_unique< SampleLogStoreClient >(
HomeLogStoreMgr::CTRL_LOG_FAMILY_IDX, [](logstore_family_id_t, logstore_seq_num_t, logdev_key) {}))
.get()
->get_log_store();

auto const start_index = [](std::shared_ptr< HomeLogStore >& _log_store) {
return curr_trunc_start(_log_store) + 1;
};
auto const next_slot = [](std::shared_ptr< HomeLogStore >& _log_store) {
auto const last_idx = _log_store->get_contiguous_issued_seq_num(curr_trunc_start(_log_store));
EXPECT_TRUE(last_idx >= 0l);
return static_cast< uint64_t >(last_idx) + 1;
};

uint64_t last_idx{100};
for (auto i = 0u; i < last_idx; ++i) {
bool io_memory{false};
auto* const d{SampleLogStoreClient::prepare_data(i, io_memory)};

EXPECT_TRUE(_hs_log_store->write_sync(next_slot(_hs_log_store),
{reinterpret_cast< uint8_t* >(d), d->total_size(), false}));

if (io_memory) {
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(d));
} else {
std::free(static_cast< void* >(d));
}
}
// ls range should be [1, 100]
EXPECT_EQ(start_index(_hs_log_store), 1ul);
EXPECT_EQ(next_slot(_hs_log_store), last_idx + 1);

// set flip to avoid logdev metablk persistance
#ifdef _PRERELEASE
flip::FlipClient* fc{HomeStoreFlip::client_instance()};
flip::FlipFrequency freq;
freq.set_count(1);
freq.set_percent(100);
fc->inject_noreturn_flip("logstore_test_skip_persist", {}, freq);
#endif

// fill gaps and truncate upto 150. ls is now [151, 151]
uint64_t truncate_upto{150};
for (auto curr_idx = next_slot(_hs_log_store); truncate_upto >= curr_idx; ++curr_idx) {
_hs_log_store->fill_gap(curr_idx);
}
_hs_log_store->truncate(truncate_upto);
// validate the satrt and end of the ls
EXPECT_EQ(start_index(_hs_log_store), truncate_upto + 1);
EXPECT_EQ(next_slot(_hs_log_store), truncate_upto + 1);

// restart homestore
const auto num_devs{SISL_OPTIONS["num_devs"].as< uint32_t >()}; // num devices
const auto dev_size_bytes{SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024};
const auto num_threads{SISL_OPTIONS["num_threads"].as< uint32_t >()};
const auto num_logstores{SISL_OPTIONS["num_logstores"].as< uint32_t >() + 1};
SampleLogStoreClient::set_validate_on_log_found(false);
SampleDB::instance().start_homestore(num_devs, dev_size_bytes, num_threads, num_logstores, true /* restart */);

_hs_log_store = SampleDB::instance().log_store_clients().back()->get_log_store();
// We are simulating a crash by setting the logstore_test_skip_persist flip which does not persist metablk
// This will invalidate truncate call above and set logstore to [1 100]
EXPECT_EQ(1ul, start_index(_hs_log_store));
EXPECT_EQ(last_idx + 1, next_slot(_hs_log_store));

// fast_forward should be resilient to crashe and should be able to recover

uint64_t fast_forward_upto{350};
_hs_log_store->sync_truncate(fast_forward_upto);
#ifdef _PRERELEASE
flip::FlipFrequency freq1;
freq1.set_count(1);
freq1.set_percent(100);
fc->inject_noreturn_flip("logstore_test_skip_persist", {}, freq1);
#endif
EXPECT_EQ(start_index(_hs_log_store), fast_forward_upto + 1);
EXPECT_EQ(next_slot(_hs_log_store), fast_forward_upto + 1);

SampleDB::instance().start_homestore(num_devs, dev_size_bytes, num_threads, num_logstores, true /* restart */);
EXPECT_EQ(start_index(_hs_log_store), fast_forward_upto + 1);
EXPECT_EQ(next_slot(_hs_log_store), fast_forward_upto + 1);

EXPECT_TRUE(SampleDB::instance().delete_log_store(_hs_log_store->get_store_id()));
SampleLogStoreClient::set_validate_on_log_found(true);
}

TEST_F(LogStoreTest, BurstRandInsertThenTruncate) {
const auto num_records{SISL_OPTIONS["num_records"].as< uint32_t >()};
const auto iterations{SISL_OPTIONS["iterations"].as< uint32_t >()};
Expand Down
Loading