Skip to content

Commit

Permalink
merge stable/v3.x to thin branch (#341)
Browse files Browse the repository at this point in the history
* Fast forward API to truncate upto a log index (#334)

* add truncate_sync api to persist log store meta immediately during truncation

Co-authored-by: Ravi Nagarjun Akella <raakella1@$HOSTNAME>

* fix the check in UT which is only valid in  prerelease build (#338)

Co-authored-by: Ravi Nagarjun Akella <raakella1@$HOSTNAME>

* skip non zero request buffers

* merge stable/v3.x to thin branch

---------

Co-authored-by: raakella1 <[email protected]>
Co-authored-by: Ravi Nagarjun Akella <raakella1@$HOSTNAME>
  • Loading branch information
3 people authored Mar 4, 2024
1 parent a86d6c6 commit b3d7e28
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 9 deletions.
1 change: 1 addition & 0 deletions .github/workflows/merge_conan_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
branches:
- master
- stable/v3.x
- thin_provisioning

jobs:
Build:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/pr_conan_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ on:
branches:
- master
- stable/v3.x
- thin_provisioning

jobs:
Build:
Expand Down
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "3.6.14"
version = "3.7.1"

homepage = "https://github.corp.ebay.com/SDS/homestore"
description = "HomeStore"
Expand Down
7 changes: 7 additions & 0 deletions src/homelogstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,13 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
}

void LogDevMetadata::persist() {
#ifdef _PRERELEASE
if (homestore_flip->test_flip("logstore_test_skip_persist")) {
LOGINFO("skipping persist of logdev metadata");
return;
}
#endif

if (m_meta_mgr_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_buf->bytes), m_raw_buf->size, m_meta_mgr_cookie);
} else {
Expand Down
57 changes: 53 additions & 4 deletions src/homelogstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ void HomeLogStore::truncate(const logstore_seq_num_t upto_seq_num, const bool in
// First try to block the flushing of logdevice and if we are successfully able to do, then
auto shared_this{shared_from_this()};
const bool locked_now{m_logdev.try_lock_flush([shared_this, upto_seq_num, in_memory_truncate_only]() {
shared_this->do_truncate(upto_seq_num);
shared_this->do_truncate(upto_seq_num, false /*persist meta now*/);
if (!in_memory_truncate_only) {
[[maybe_unused]] const auto key{shared_this->get_family().do_device_truncate()};
}
Expand All @@ -237,13 +237,62 @@ void HomeLogStore::truncate(const logstore_seq_num_t upto_seq_num, const bool in
if (locked_now) { m_logdev.unlock_flush(); }
}

void HomeLogStore::sync_truncate(const logstore_seq_num_t upto_seq_num, const bool in_memory_truncate_only) {
// Check if we need to fill any gaps in the logstore
auto const last_idx{get_contiguous_issued_seq_num(std::max(0l, truncated_upto()))};
HS_REL_ASSERT_GE(last_idx, 0l, "Negative sequence number: {} [Logstore id ={}]", last_idx, m_store_id);
auto const next_slot{last_idx + 1};
for (auto curr_idx = next_slot; upto_seq_num >= curr_idx; ++curr_idx) {
fill_gap(curr_idx);
}

#ifndef NDEBUG
const auto s{m_safe_truncation_boundary.seq_num.load(std::memory_order_acquire)};
// Don't check this if we don't know our truncation boundary. The call is made to inform us about
// correct truncation point.
if (s != -1) {
HS_DBG_ASSERT_LE(upto_seq_num, get_contiguous_completed_seq_num(s),
"Logstore {} expects truncation to be contiguously completed", m_store_id);
}
#endif

struct Context {
std::mutex truncate_mutex;
std::condition_variable truncate_cv;
bool truncate_done{false};
};
auto ctx{std::make_shared< Context >()};

auto shared_this{shared_from_this()};
const bool locked_now{m_logdev.try_lock_flush([shared_this, upto_seq_num, in_memory_truncate_only, ctx]() {
shared_this->do_truncate(upto_seq_num, true /*persist meta now*/);
if (!in_memory_truncate_only) {
[[maybe_unused]] const auto key{shared_this->get_family().do_device_truncate()};
}
{
std::unique_lock< std::mutex > lk{ctx->truncate_mutex};
ctx->truncate_done = true;
}
ctx->truncate_cv.notify_one();
})};

if (locked_now) {
m_logdev.unlock_flush();
} else {
{
std::unique_lock< std::mutex > lk{ctx->truncate_mutex};
ctx->truncate_cv.wait(lk, [&ctx] { return ctx->truncate_done; });
}
}
}

// NOTE: This method assumes the flush lock is already acquired by the caller
void HomeLogStore::do_truncate(const logstore_seq_num_t upto_seq_num) {
void HomeLogStore::do_truncate(const logstore_seq_num_t upto_seq_num, const bool persist_now) {
m_records.truncate(upto_seq_num);
m_safe_truncation_boundary.seq_num.store(upto_seq_num, std::memory_order_release);

// Need to update the superblock with meta, we don't persist yet, will be done as part of log dev truncation
m_logdev.update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, false /* persist_now */);
// Need to update the superblock with meta, in case we don't persist yet, will be done as part of log dev truncation
m_logdev.update_store_superblk(m_store_id, logstore_superblk{upto_seq_num + 1}, persist_now /* persist_now */);

const int ind{search_max_le(upto_seq_num)};
if (ind < 0) {
Expand Down
14 changes: 11 additions & 3 deletions src/homelogstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ struct log_dump_req {
std::shared_ptr< HomeLogStore > log_store; // if null all log stores are dumped
logstore_seq_num_t start_seq_num; // empty_key if from start of log file
logstore_seq_num_t end_seq_num; // empty_key if till last log entry
logstore_seq_num_t batch_size = 0; // Size of the output batch.
logstore_seq_num_t batch_size = 0; // Size of the output batch.
};

struct logstore_record {
Expand Down Expand Up @@ -94,7 +94,7 @@ struct logstore_req {
if (req->is_internal_req) { sisl::ObjectAllocator< logstore_req >::deallocate(req); }
}

friend class sisl::ObjectAllocator<logstore_req>;
friend class sisl::ObjectAllocator< logstore_req >;

private:
logstore_req() = default;
Expand Down Expand Up @@ -396,6 +396,14 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
*/
void truncate(const logstore_seq_num_t upto_seq_num, const bool in_memory_truncate_only = true);

/**
* @brief Truncate the logs for this log store upto the seq_num provided (inclusive) and persist the metablock
* before returning. If the next available slot is less than upto_seq_num, it will fill the gaps.
* This API makes sure the log idx is durable after the truncation.
* See above truncate method for more details on parameters
*/
void sync_truncate(const logstore_seq_num_t upto_seq_num, const bool in_memory_truncate_only = true);

/**
* @brief Fill the gap in the seq_num with a dummy value. This ensures that get_contiguous_issued and completed
* seq_num methods move forward. The filled data is not readable and any attempt to read this seq_num will
Expand Down Expand Up @@ -513,7 +521,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
void on_log_found(const logstore_seq_num_t seq_num, const logdev_key ld_key, const logdev_key flush_ld_key,
const log_buffer buf);
void on_batch_completion(const logdev_key& flush_batch_ld_key);
void do_truncate(const logstore_seq_num_t upto_seq_num);
void do_truncate(const logstore_seq_num_t upto_seq_num, const bool persist_now);
[[nodiscard]] int search_max_le(const logstore_seq_num_t input_sn);

private:
Expand Down
106 changes: 105 additions & 1 deletion src/homelogstore/tests/test_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

#include "api/vol_interface.hpp"
#include "test_common/homestore_test_common.hpp"
#include "engine/common/homestore_flip.hpp"

#include "../log_store.hpp"

Expand Down Expand Up @@ -114,6 +115,9 @@ class SampleLogStoreClient {
m_log_store->register_log_found_cb(bind_this(SampleLogStoreClient::on_log_found, 3));
}

auto get_log_store() { return m_log_store; }
static void set_validate_on_log_found(const bool flag) { validate_on_log_found = flag; }

void reset_recovery() {
m_n_recovered_lsns = 0;
m_n_recovered_truncated_lsns = 0;
Expand Down Expand Up @@ -305,6 +309,7 @@ class SampleLogStoreClient {
}

void on_log_found(const logstore_seq_num_t lsn, const log_buffer buf, void* const ctx) {
if (!validate_on_log_found) { return; }
LOGDEBUG("Recovered lsn {}:{} with log data of size {}", m_log_store->get_store_id(), lsn, buf.size())
EXPECT_LE(lsn, m_cur_lsn.load()) << "Recovered incorrect lsn " << m_log_store->get_store_id() << ":" << lsn
<< "Expected less than cur_lsn " << m_cur_lsn.load();
Expand Down Expand Up @@ -370,6 +375,7 @@ class SampleLogStoreClient {
private:
static constexpr uint32_t max_data_size = 1024;
static uint64_t s_max_flush_multiple;
inline static bool validate_on_log_found = true;

logstore_id_t m_store_id;
test_log_store_comp_cb_t m_comp_cb;
Expand Down Expand Up @@ -482,7 +488,7 @@ class SampleDB {

if (SISL_OPTIONS.count("http_port")) {
test_common::set_fixed_http_port(SISL_OPTIONS["http_port"].as< uint32_t >());
}else {
} else {
test_common::set_random_http_port();
}
VolInterface::init(params, restart);
Expand Down Expand Up @@ -554,6 +560,8 @@ class SampleDB {
return m_highest_log_idx[fid].load();
}

std::vector< std::unique_ptr< SampleLogStoreClient > >& log_store_clients() { return m_log_store_clients; }

private:
const static std::string s_fpath_root;
std::vector< std::string > m_dev_names;
Expand Down Expand Up @@ -897,6 +905,102 @@ class LogStoreTest : public ::testing::Test {
uint32_t m_holes_per_batch{0};
};

static uint64_t curr_trunc_start(std::shared_ptr< HomeLogStore >& _log_store) {
return std::max(0l, _log_store->truncated_upto());
}

TEST_F(LogStoreTest, TruncateDurability) {
auto _hs_log_store =
SampleDB::instance()
.log_store_clients()
.emplace_back(std::make_unique< SampleLogStoreClient >(
HomeLogStoreMgr::CTRL_LOG_FAMILY_IDX, [](logstore_family_id_t, logstore_seq_num_t, logdev_key) {}))
.get()
->get_log_store();

auto const start_index = [](std::shared_ptr< HomeLogStore >& _log_store) {
return curr_trunc_start(_log_store) + 1;
};
auto const next_slot = [](std::shared_ptr< HomeLogStore >& _log_store) {
auto const last_idx = _log_store->get_contiguous_issued_seq_num(curr_trunc_start(_log_store));
EXPECT_TRUE(last_idx >= 0l);
return static_cast< uint64_t >(last_idx) + 1;
};

uint64_t last_idx{100};
for (auto i = 0u; i < last_idx; ++i) {
bool io_memory{false};
auto* const d{SampleLogStoreClient::prepare_data(i, io_memory)};

EXPECT_TRUE(_hs_log_store->write_sync(next_slot(_hs_log_store),
{reinterpret_cast< uint8_t* >(d), d->total_size(), false}));

if (io_memory) {
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(d));
} else {
std::free(static_cast< void* >(d));
}
}
// ls range should be [1, 100]
EXPECT_EQ(start_index(_hs_log_store), 1ul);
EXPECT_EQ(next_slot(_hs_log_store), last_idx + 1);

// set flip to avoid logdev metablk persistance
#ifdef _PRERELEASE
flip::FlipClient* fc{HomeStoreFlip::client_instance()};
flip::FlipFrequency freq;
freq.set_count(1);
freq.set_percent(100);
fc->inject_noreturn_flip("logstore_test_skip_persist", {}, freq);
#endif

// fill gaps and truncate upto 150. ls is now [151, 151]
uint64_t truncate_upto{150};
for (auto curr_idx = next_slot(_hs_log_store); truncate_upto >= curr_idx; ++curr_idx) {
_hs_log_store->fill_gap(curr_idx);
}
_hs_log_store->truncate(truncate_upto);
// validate the satrt and end of the ls
EXPECT_EQ(start_index(_hs_log_store), truncate_upto + 1);
EXPECT_EQ(next_slot(_hs_log_store), truncate_upto + 1);

// restart homestore
const auto num_devs{SISL_OPTIONS["num_devs"].as< uint32_t >()}; // num devices
const auto dev_size_bytes{SISL_OPTIONS["dev_size_mb"].as< uint64_t >() * 1024 * 1024};
const auto num_threads{SISL_OPTIONS["num_threads"].as< uint32_t >()};
const auto num_logstores{SISL_OPTIONS["num_logstores"].as< uint32_t >() + 1};
SampleLogStoreClient::set_validate_on_log_found(false);
SampleDB::instance().start_homestore(num_devs, dev_size_bytes, num_threads, num_logstores, true /* restart */);

_hs_log_store = SampleDB::instance().log_store_clients().back()->get_log_store();
#ifdef _PRERELEASE
// We are simulating a crash by setting the logstore_test_skip_persist flip which does not persist metablk
// This will invalidate truncate call above and set logstore to [1 100]
EXPECT_EQ(1ul, start_index(_hs_log_store));
EXPECT_EQ(last_idx + 1, next_slot(_hs_log_store));
#endif

// fast_forward should be resilient to crashe and should be able to recover

uint64_t fast_forward_upto{350};
_hs_log_store->sync_truncate(fast_forward_upto);
#ifdef _PRERELEASE
flip::FlipFrequency freq1;
freq1.set_count(1);
freq1.set_percent(100);
fc->inject_noreturn_flip("logstore_test_skip_persist", {}, freq1);
#endif
EXPECT_EQ(start_index(_hs_log_store), fast_forward_upto + 1);
EXPECT_EQ(next_slot(_hs_log_store), fast_forward_upto + 1);

SampleDB::instance().start_homestore(num_devs, dev_size_bytes, num_threads, num_logstores, true /* restart */);
EXPECT_EQ(start_index(_hs_log_store), fast_forward_upto + 1);
EXPECT_EQ(next_slot(_hs_log_store), fast_forward_upto + 1);

EXPECT_TRUE(SampleDB::instance().delete_log_store(_hs_log_store->get_store_id()));
SampleLogStoreClient::set_validate_on_log_found(true);
}

TEST_F(LogStoreTest, BurstRandInsertThenTruncate) {
const auto num_records{SISL_OPTIONS["num_records"].as< uint32_t >()};
const auto iterations{SISL_OPTIONS["iterations"].as< uint32_t >()};
Expand Down

0 comments on commit b3d7e28

Please sign in to comment.