Skip to content

Commit

Permalink
Add INDEX FLIP and CP abrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
shosseinimotlagh committed May 9, 2024
1 parent 74ca779 commit 30e8981
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 14 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.4"
version = "6.4.5"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
39 changes: 38 additions & 1 deletion src/include/homestore/btree/btree.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,31 @@ struct BtreeThreadVariables {
BtreeNodePtr force_split_node{nullptr};
};

struct BTREE_FLIPS {
static constexpr uint32_t INDEX_PARENT_NON_ROOT = 1 << 0;
static constexpr uint32_t INDEX_PARENT_ROOT = 1 << 1;
static constexpr uint32_t INDEX_LEFT_SIBLING = 1 << 2;
static constexpr uint32_t INDEX_RIGHT_SIBLING = 1 << 3;

uint32_t flips;
BTREE_FLIPS() : flips{0} {}
std::string list() const {
std::string str;
if (flips & INDEX_PARENT_NON_ROOT) { str += "index_parent_non_root,"; }
if (flips & INDEX_PARENT_ROOT) { str += "index_parent_root,"; }
if (flips & INDEX_LEFT_SIBLING) { str += "index_left_sibling,"; }
if (flips & INDEX_RIGHT_SIBLING) { str += "index_right_sibling,"; }
return str;
}
void set_flip(uint32_t flip) { flips |= flip; }
void set_flip(std::string flip) {
if (flip == "index_parent_non_root") { set_flip(INDEX_PARENT_NON_ROOT); }
if (flip == "index_parent_root") { set_flip(INDEX_PARENT_ROOT); }
if (flip == "index_left_sibling") { set_flip(INDEX_LEFT_SIBLING); }
if (flip == "index_right_sibling") { set_flip(INDEX_RIGHT_SIBLING); }
}
};

template < typename K, typename V >
class Btree {
private:
Expand All @@ -52,7 +77,9 @@ class Btree {
#ifndef NDEBUG
std::atomic< uint64_t > m_req_id{0};
#endif

#ifdef _PRERELEASE
BTREE_FLIPS m_flips;
#endif
// This workaround of BtreeThreadVariables is needed instead of directly declaring statics
// to overcome the gcc bug, pointer here: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=66944
static BtreeThreadVariables* bt_thread_vars() {
Expand Down Expand Up @@ -100,6 +127,16 @@ class Btree {

// static void set_io_flip();
// static void set_error_flip();
#ifdef _PRERELEASE
void set_flip_point(std::string flip) {
m_flips.set_flip(flip);
}
void set_flips(std::vector< std::string > flips) {
for (const auto& flip : flips) {
set_flip_point(flip);
}
}
#endif

protected:
/////////////////////////// Methods the underlying store is expected to handle ///////////////////////////
Expand Down
12 changes: 8 additions & 4 deletions src/include/homestore/btree/detail/btree_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,13 @@ struct persistent_hdr_t {

persistent_hdr_t() : nentries{0}, leaf{0}, valid_node{1} {}
std::string to_string() const {
return fmt::format("magic={} version={} csum={} node_id={} next_node={} nentries={} node_type={} is_leaf={} "
"valid_node={} node_gen={} link_version={} edge_nodeid={}, edge_link_version={} level={} ",
magic, version, checksum, node_id, next_node, nentries, node_type, leaf, valid_node,
node_gen, link_version, edge_info.m_bnodeid, edge_info.m_link_version, level);
std::string sleaf = leaf ? "LEAF" : "INTERIOR";
std::string snext = next_node == empty_bnodeid ? "" : fmt::format("next_node={}", next_node);
std::string edge = edge_info.m_bnodeid == empty_bnodeid
? ""
: "edge:" + std::to_string(edge_info.m_bnodeid) + "." + std::to_string(edge_info.m_link_version);
return fmt::format("magic={} version={} csum={} node: {}.{} level:{} nEntries={} {} {} {} node_gen={} ", magic,
version, checksum, node_id, link_version, level, nentries, sleaf, snext, edge, node_gen);
}
};
#pragma pack()
Expand Down Expand Up @@ -104,6 +107,7 @@ class BtreeNode : public sisl::ObjLifeCounter< BtreeNode > {

// Identify if a node is a leaf node or not, from raw buffer, by just reading persistent_hdr_t
static bool identify_leaf_node(uint8_t* buf) { return (r_cast< persistent_hdr_t* >(buf))->leaf; }
static std::string to_string_buf(uint8_t* buf) { return (r_cast< persistent_hdr_t* >(buf))->to_string(); }

/// @brief Finds the index of the entry with the specified key in the node.
///
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ struct CP {
cp_id_t m_cp_id;
std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts;
folly::SharedPromise< bool > m_comp_promise;
#ifdef _PRERELEASE
std::atomic<bool> m_abrupt_cp{false};
#endif

public:
CP(CPManager* mgr) : m_cp_mgr{mgr} {}
Expand Down
9 changes: 9 additions & 0 deletions src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ class CPContext {
CP* cp() { return m_cp; }
cp_id_t id() const;
void complete(bool status) { m_flush_comp.setValue(status); }
#ifdef _PRERELEASE
void abrupt() {
m_cp->m_abrupt_cp.store(true);
complete(true);
}
bool is_abrupt() {
return m_cp->m_abrupt_cp.load();
}
#endif
folly::Future< bool > get_future() { return m_flush_comp.getFuture(); }

virtual ~CPContext() = default;
Expand Down
1 change: 0 additions & 1 deletion src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ class HomeStore {
ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); }
CPManager& cp_mgr() { return *m_cp_mgr.get(); }
shared< sisl::Evictor > evictor() { return m_evictor; }

private:
void init_cache();
shared< VirtualDev > create_vdev_cb(const vdev_info& vinfo, bool load_existing);
Expand Down
25 changes: 24 additions & 1 deletion src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <homestore/index_service.hpp>
#include <homestore/checkpoint/cp_mgr.hpp>
#include <homestore/index/wb_cache_base.hpp>
#include <iomgr/iomgr_flip.hpp>

SISL_LOGGING_DECL(wbcache)

Expand Down Expand Up @@ -147,6 +148,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("index_right_sibling")) {
index_service().wb_cache().add_to_crashing_buffers(right_child->m_idx_buf, "index_right_sibling");
}
#endif
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
}

Expand All @@ -160,11 +166,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
return str;
};

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("index_left_sibling")) {
index_service().wb_cache().add_to_crashing_buffers(left_child_idx_node->m_idx_buf, "index_left_sibling");
}
#endif
LOGTRACEMOD(wbcache, "{}", trace_index_bufs());
write_node_impl(left_child_node, context);
write_node_impl(parent_node, context);

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("index_parent_non_root")) {
if(parent_node->node_id()!= this->root_node_id()){
index_service().wb_cache().add_to_crashing_buffers(parent_idx_node->m_idx_buf, "index_parent_non_root");
}
}
if (iomgr_flip::instance()->test_flip("index_parent_root")) {
if(parent_node->node_id()== this->root_node_id()) {
index_service().wb_cache().add_to_crashing_buffers(left_child_idx_node->m_idx_buf, "index_parent_root");
}
}
#endif

return btree_status_t::success;
}

Expand Down
8 changes: 8 additions & 0 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ class IndexWBCacheBase {
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0;

#ifdef _PRERELEASE
/// @brief add the buffer to crashing buffers list. In transact_write_nodes(), it will be called. During flushing
/// wbcache, it will be checked. If the buffer is in the list, cp_abrupt happens.
/// @param first index buffer
/// @param second the reason
virtual void add_to_crashing_buffers(IndexBufferPtr, std::string reason) = 0;
#endif
};

} // namespace homestore
26 changes: 25 additions & 1 deletion src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ void IndexWBCache::read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t
goto retry;
}
}

#ifdef _PRERELEASE
void IndexWBCache::add_to_crashing_buffers(IndexBufferPtr buf, std::string reason) {
std::unique_lock lg(flip_mtx);
this->crashing_buffers[buf].push_back(reason);
}
#endif
std::pair< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) {
bool second_copied{false}, third_copied{false};
auto chain = second;
Expand Down Expand Up @@ -241,6 +246,25 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) {
void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) {
LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string());
buf->set_state(index_buf_state_t::FLUSHING);

#ifdef _PRERELEASE

if (cp_ctx->is_abrupt()) {
LOGTRACEMOD(wbcache, "The cp {} is abrupt! for {}", cp_ctx->id(), BtreeNode::to_string_buf(buf->raw_buffer()));
return;
}
if (auto it = crashing_buffers.find(buf);it != crashing_buffers.end()) {
const auto& reasons = it->second;
std::string formatted_reasons = fmt::format("[{}]", fmt::join(reasons, ", "));
LOGTRACEMOD(wbcache, "Buffer {} is in crashing_buffers with reason(s): {} - Buffer info: {}",
buf->to_string(), formatted_reasons, BtreeNode::to_string_buf(buf->raw_buffer()));
crashing_buffers.clear();
cp_ctx->abrupt();
return;
}
#endif
LOGTRACEMOD(wbcache, "flushing cp {} buf {} info: {}", cp_ctx->id(), buf->to_string(),
BtreeNode::to_string_buf(buf->raw_buffer()));
m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture
Expand Down
11 changes: 9 additions & 2 deletions src/lib/index/wb_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class IndexWBCache : public IndexWBCacheBase {
std::vector< iomgr::io_fiber_t > m_cp_flush_fibers;
std::mutex m_flush_mtx;

#ifdef _PRERELEASE
std::mutex flip_mtx;
std::map< IndexBufferPtr, std::vector< std::string > > crashing_buffers;
#endif

public:
IndexWBCache(const std::shared_ptr< VirtualDev >& vdev, const std::shared_ptr< sisl::Evictor >& evictor,
uint32_t node_size);
Expand All @@ -55,8 +60,10 @@ class IndexWBCache : public IndexWBCacheBase {

//////////////////// CP Related API section /////////////////////////////////
folly::Future< bool > async_cp_flush(IndexCPContext* context);
IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext *cp_ctx) const;

IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const;
#ifdef _PRERELEASE
void add_to_crashing_buffers(IndexBufferPtr, std::string) override;
#endif
private:
void start_flush_threads();
void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf);
Expand Down
16 changes: 15 additions & 1 deletion src/tests/btree_helpers/btree_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <sisl/options/options.h>
#include <sisl/logging/logging.h>
#include <sisl/utility/enum.hpp>
#include <iomgr/iomgr_flip.hpp>
#include <boost/algorithm/string.hpp>

#include <homestore/btree/mem_btree.hpp>
Expand Down Expand Up @@ -80,8 +81,21 @@ struct BtreeTestHelper {
std::condition_variable m_test_done_cv;
std::random_device m_re;
std::atomic< uint32_t > m_num_ops{0};

#ifdef _PRERELEASE
flip::FlipClient m_fc{iomgr_flip::instance()};
#endif
public:
#ifdef _PRERELEASE
void set_flip_point(const std::string flip_name) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(10000);
freq.set_percent(1);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
m_bt->set_flip_point(flip_name);
LOGDEBUG("Flip {} set", flip_name);
}
#endif
void preload(uint32_t preload_size) {
if (preload_size) {
const auto n_fibers = std::min(preload_size, (uint32_t)m_fibers.size());
Expand Down
5 changes: 3 additions & 2 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ SISL_OPTION_GROUP(
::cxxopts::value< int >()->default_value("-1"), "number"),
(num_io, "", "num_io", "number of IO operations", ::cxxopts::value< uint64_t >()->default_value("300"), "number"),
(qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"),
(spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"));
(spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"),
(flip_list, "", "flip_list", "btree flip list", ::cxxopts::value< std::vector< std::string > >(), "flips [...]"),
(enable_crash, "", "enable_crash", "enable crash", ::cxxopts::value< bool >()->default_value("0"), ""));

SETTINGS_INIT(iomgrcfg::IomgrSettings, iomgr_config);

Expand Down Expand Up @@ -174,7 +176,6 @@ class HSTestHelper {
uint64_t min_chunk_size{0};
vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC};
};

static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params,
hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false,
bool init_device = true, uint32_t shutdown_delay_sec = 5) {
Expand Down

0 comments on commit 30e8981

Please sign in to comment.