Skip to content

Commit

Permalink
Add INDEX FLIP and CP abrupt
Browse files Browse the repository at this point in the history
  • Loading branch information
shosseinimotlagh committed May 7, 2024
1 parent 12390e7 commit f41a4a4
Show file tree
Hide file tree
Showing 10 changed files with 139 additions and 12 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.3"
version = "6.4.4"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
12 changes: 8 additions & 4 deletions src/include/homestore/btree/detail/btree_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,13 @@ struct persistent_hdr_t {

persistent_hdr_t() : nentries{0}, leaf{0}, valid_node{1} {}
std::string to_string() const {
return fmt::format("magic={} version={} csum={} node_id={} next_node={} nentries={} node_type={} is_leaf={} "
"valid_node={} node_gen={} link_version={} edge_nodeid={}, edge_link_version={} level={} ",
magic, version, checksum, node_id, next_node, nentries, node_type, leaf, valid_node,
node_gen, link_version, edge_info.m_bnodeid, edge_info.m_link_version, level);
std::string sleaf = leaf ? "LEAF" : "INTERIOR";
std::string snext = next_node == empty_bnodeid ? "" : fmt::format("next_node={}", next_node);
std::string edge = edge_info.m_bnodeid == empty_bnodeid
? ""
: "edge:" + std::to_string(edge_info.m_bnodeid) + "." + std::to_string(edge_info.m_link_version);
return fmt::format("{}.{} level:{} nEntries={} {} {} {} node_gen={} ", node_id, link_version, level, nentries,
sleaf, snext, edge, node_gen);
}
};
#pragma pack()
Expand Down Expand Up @@ -104,6 +107,7 @@ class BtreeNode : public sisl::ObjLifeCounter< BtreeNode > {

// Identify if a node is a leaf node or not, from raw buffer, by just reading persistent_hdr_t
static bool identify_leaf_node(uint8_t* buf) { return (r_cast< persistent_hdr_t* >(buf))->leaf; }
static std::string to_string_buf(uint8_t* buf) { return (r_cast< persistent_hdr_t* >(buf))->to_string(); }

/// @brief Finds the index of the entry with the specified key in the node.
///
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/checkpoint/cp.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ struct CP {
cp_id_t m_cp_id;
std::array< std::unique_ptr< CPContext >, (size_t)cp_consumer_t::SENTINEL > m_contexts;
folly::SharedPromise< bool > m_comp_promise;
#ifdef _PRERELEASE
std::atomic<bool> m_abrupt_cp{false};
#endif

public:
CP(CPManager* mgr) : m_cp_mgr{mgr} {}
Expand Down
6 changes: 6 additions & 0 deletions src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class CPContext {
CP* cp() { return m_cp; }
cp_id_t id() const;
void complete(bool status) { m_flush_comp.setValue(status); }
#ifdef _PRERELEASE
void abrupt() {
m_cp->m_abrupt_cp.store(true);
complete(true);
}
#endif
folly::Future< bool > get_future() { return m_flush_comp.getFuture(); }

virtual ~CPContext() = default;
Expand Down
39 changes: 38 additions & 1 deletion src/include/homestore/homestore.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,31 @@ struct HS_SERVICE {
}
};

struct HOMESTORE_FLIPS {
static constexpr uint32_t INDEX_PARENT_NON_ROOT = 1 << 0;
static constexpr uint32_t INDEX_PARENT_ROOT = 1 << 1;
static constexpr uint32_t INDEX_LEFT_SIBLING = 1 << 2;
static constexpr uint32_t INDEX_RIGHT_SIBLING = 1 << 3;

uint32_t flips;
HOMESTORE_FLIPS() : flips{0} {}
std::string list() const {
std::string str;
if (flips & INDEX_PARENT_NON_ROOT) { str += "INDEX_PARENT_NON_ROOT,"; }
if (flips & INDEX_PARENT_ROOT) { str += "INDEX_PARENT_ROOT,"; }
if (flips & INDEX_LEFT_SIBLING) { str += "INDEX_LEFT_SIBLING,"; }
if (flips & INDEX_RIGHT_SIBLING) { str += "INDEX_RIGHT_SIBLING,"; }
return str;
}
void set_flip(uint32_t flip) { flips |= flip; }
void set_flip(std::string flip) {
if (flip == "INDEX_PARENT_NON_ROOT") { set_flip(INDEX_PARENT_NON_ROOT); }
if (flip == "INDEX_PARENT_ROOT") { set_flip(INDEX_PARENT_ROOT); }
if (flip == "INDEX_LEFT_SIBLING") { set_flip(INDEX_LEFT_SIBLING); }
if (flip == "INDEX_RIGHT_SIBLING") { set_flip(INDEX_RIGHT_SIBLING); }
}
};

/*
* IO errors handling by homestore.
* Write error :- Reason :- Disk error, space full,btree node read fail
Expand Down Expand Up @@ -169,7 +194,19 @@ class HomeStore {
ResourceMgr& resource_mgr() { return *m_resource_mgr.get(); }
CPManager& cp_mgr() { return *m_cp_mgr.get(); }
shared< sisl::Evictor > evictor() { return m_evictor; }

#ifdef _PRERELEASE
void set_flip_point(std::string flip) {
m_flips.set_flip(flip);
}
void set_flips(std::vector< std::string > flips) {
for (const auto& flip : flips) {
set_flip_point(flip);
}
}
#endif
#ifdef _PRERELEASE
HOMESTORE_FLIPS m_flips;
#endif
private:
void init_cache();
shared< VirtualDev > create_vdev_cb(const vdev_info& vinfo, bool load_existing);
Expand Down
25 changes: 24 additions & 1 deletion src/include/homestore/index/index_table.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <homestore/index_service.hpp>
#include <homestore/checkpoint/cp_mgr.hpp>
#include <homestore/index/wb_cache_base.hpp>
#include <iomgr/iomgr_flip.hpp>

SISL_LOGGING_DECL(wbcache)

Expand Down Expand Up @@ -147,6 +148,11 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
for (const auto& right_child_node : new_nodes) {
auto right_child = IndexBtreeNode::convert(right_child_node.get());
write_node_impl(right_child_node, context);
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("INDEX_RIGHT_SIBLING")) {
index_service().wb_cache().add_to_crashing_buffers(right_child->m_idx_buf, "INDEX_RIGHT_SIBLING");
}
#endif
wb_cache().prepend_to_chain(right_child->m_idx_buf, left_child_buf);
}

Expand All @@ -160,11 +166,28 @@ class IndexTable : public IndexTableBase, public Btree< K, V > {
}
return str;
};

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("INDEX_LEFT_SIBLING")) {
index_service().wb_cache().add_to_crashing_buffers(left_child_idx_node->m_idx_buf, "INDEX_LEFT_SIBLING");
}
#endif
LOGTRACEMOD(wbcache, "{}", trace_index_bufs());
write_node_impl(left_child_node, context);
write_node_impl(parent_node, context);

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("INDEX_PARENT_NON_ROOT")) {
if(parent_node->node_id()!= this->root_node_id()){
index_service().wb_cache().add_to_crashing_buffers(parent_idx_node->m_idx_buf, "INDEX_PARENT_NON_ROOT");
}
}
if (iomgr_flip::instance()->test_flip("INDEX_PARENT_ROOT")) {
if(parent_node->node_id()== this->root_node_id()) {
index_service().wb_cache().add_to_crashing_buffers(left_child_idx_node->m_idx_buf, "INDEX_PARENT_ROOT");
}
}
#endif

return btree_status_t::success;
}

Expand Down
4 changes: 4 additions & 0 deletions src/include/homestore/index/wb_cache_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ class IndexWBCacheBase {
/// @param cur_buf
/// @return
virtual IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* context) const = 0;

#ifdef _PRERELEASE
virtual void add_to_crashing_buffers(IndexBufferPtr, std::string reason) = 0;
#endif
};

} // namespace homestore
27 changes: 26 additions & 1 deletion src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ void IndexWBCache::read_buf(bnodeid_t id, BtreeNodePtr& node, node_initializer_t
goto retry;
}
}

#ifdef _PRERELEASE
void IndexWBCache::add_to_crashing_buffers(IndexBufferPtr buf, std::string reason) {
std::unique_lock lg(flip_mtx);
this->crashing_buffers[buf].push_back(reason);
}
#endif
std::pair< bool, bool > IndexWBCache::create_chain(IndexBufferPtr& second, IndexBufferPtr& third, CPContext* cp_ctx) {
bool second_copied{false}, third_copied{false};
auto chain = second;
Expand Down Expand Up @@ -241,6 +246,26 @@ folly::Future< bool > IndexWBCache::async_cp_flush(IndexCPContext* cp_ctx) {
void IndexWBCache::do_flush_one_buf(IndexCPContext* cp_ctx, IndexBufferPtr buf, bool part_of_batch) {
LOGTRACEMOD(wbcache, "cp {} buf {}", cp_ctx->id(), buf->to_string());
buf->set_state(index_buf_state_t::FLUSHING);

#ifdef _PRERELEASE

if (m_abrupt_flush.load() == true) {
LOGTRACEMOD(wbcache, "The cp {} is abrupt! for {}", cp_ctx->id(), BtreeNode::to_string_buf(buf->raw_buffer()));
return;
}
if (auto it = crashing_buffers.find(buf);it != crashing_buffers.end()) {
m_abrupt_flush.store(true);
const auto& reasons = it->second;
std::string formatted_reasons = fmt::format("[{}]", fmt::join(reasons, ", "));
LOGTRACEMOD(wbcache, "Buffer {} is in crashing_buffers with reason(s): {} - Buffer info: {}",
buf->to_string(), formatted_reasons, BtreeNode::to_string_buf(buf->raw_buffer()));
crashing_buffers.clear();
cp_ctx->abrupt();
return;
}
#endif
LOGTRACEMOD(wbcache, "flushing cp {} buf {} info: {}", cp_ctx->id(), buf->to_string(),
BtreeNode::to_string_buf(buf->raw_buffer()));
m_vdev->async_write(r_cast< const char* >(buf->raw_buffer()), m_node_size, buf->m_blkid, part_of_batch)
.thenValue([buf, cp_ctx](auto) {
auto& pthis = s_cast< IndexWBCache& >(wb_cache()); // Avoiding more than 16 bytes capture
Expand Down
12 changes: 10 additions & 2 deletions src/lib/index/wb_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,12 @@ class IndexWBCache : public IndexWBCacheBase {
std::vector< iomgr::io_fiber_t > m_cp_flush_fibers;
std::mutex m_flush_mtx;

#ifdef _PRERELEASE
std::atomic< bool > m_abrupt_flush = false;
std::mutex flip_mtx;
std::map< IndexBufferPtr, std::vector< std::string > > crashing_buffers;
#endif

public:
IndexWBCache(const std::shared_ptr< VirtualDev >& vdev, const std::shared_ptr< sisl::Evictor >& evictor,
uint32_t node_size);
Expand All @@ -55,8 +61,10 @@ class IndexWBCache : public IndexWBCacheBase {

//////////////////// CP Related API section /////////////////////////////////
folly::Future< bool > async_cp_flush(IndexCPContext* context);
IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext *cp_ctx) const;

IndexBufferPtr copy_buffer(const IndexBufferPtr& cur_buf, const CPContext* cp_ctx) const;
#ifdef _PRERELEASE
void add_to_crashing_buffers(IndexBufferPtr, std::string) override;
#endif
private:
void start_flush_threads();
void process_write_completion(IndexCPContext* cp_ctx, IndexBufferPtr pbuf);
Expand Down
21 changes: 19 additions & 2 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <homestore/checkpoint/cp_mgr.hpp>
#include <iomgr/iomgr_config_generated.h>
#include <common/homestore_assert.hpp>
#include "common/homestore_flip.hpp"

const std::string SPDK_ENV_VAR_STRING{"USER_WANT_SPDK"};
const std::string HTTP_SVC_ENV_VAR_STRING{"USER_WANT_HTTP_OFF"};
Expand All @@ -53,7 +54,9 @@ SISL_OPTION_GROUP(
::cxxopts::value< int >()->default_value("-1"), "number"),
(num_io, "", "num_io", "number of IO operations", ::cxxopts::value< uint64_t >()->default_value("300"), "number"),
(qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"),
(spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"));
(spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"),
(flip_list, "", "flip_list", "btree flip list", ::cxxopts::value< std::vector< std::string > >(), "flips [...]"),
(enable_crash, "", "enable_crash", "enable crash", ::cxxopts::value< bool >()->default_value("0"), ""));

SETTINGS_INIT(iomgrcfg::IomgrSettings, iomgr_config);

Expand Down Expand Up @@ -174,7 +177,21 @@ class HSTestHelper {
uint64_t min_chunk_size{0};
vdev_size_type_t vdev_size_type{vdev_size_type_t::VDEV_SIZE_STATIC};
};

#ifdef _PRERELEASE
flip::FlipClient m_fc{HomeStoreFlip::instance()};
#endif
#ifdef _PRERELEASE
void set_flip_point(const std::string flip_name) {
flip::FlipCondition null_cond;
flip::FlipFrequency freq;
freq.set_count(10000);
freq.set_percent(100);
m_fc.inject_noreturn_flip(flip_name, {null_cond}, freq);
auto hsi = HomeStore::instance();
hsi->set_flip_point(flip_name);
LOGDEBUG("Flip {} set", flip_name);
}
#endif
static void start_homestore(const std::string& test_name, std::map< uint32_t, test_params >&& svc_params,
hs_before_services_starting_cb_t cb = nullptr, bool fake_restart = false,
bool init_device = true, uint32_t shutdown_delay_sec = 5) {
Expand Down

0 comments on commit f41a4a4

Please sign in to comment.