Skip to content

Commit

Permalink
recover metablk in subtype order (#200)
Browse files Browse the repository at this point in the history
Signed-off-by: Jie Yao <[email protected]>
  • Loading branch information
JacksonYao287 authored Nov 7, 2023
1 parent 9dc9130 commit 3911682
Show file tree
Hide file tree
Showing 8 changed files with 324 additions and 82 deletions.
10 changes: 9 additions & 1 deletion src/include/homestore/meta_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
#include <cstdint>
#include <functional>
#include <map>
#include <unordered_map>
#include <memory>
#include <mutex>
#include <string>
#include <system_error>
#include <vector>
#include <optional>

#include <sisl/fds/buffer.hpp>
#include <sisl/metrics/metrics.hpp>
Expand All @@ -46,10 +48,12 @@ struct vdev_info;
// new blk found subsystem callback
typedef std::function< void(meta_blk* mblk, sisl::byte_view buf, size_t size) > meta_blk_found_cb_t;
typedef std::string meta_sub_type;
typedef std::vector< meta_sub_type > meta_subtype_vec_t;
typedef std::function< void(bool success) > meta_blk_recover_comp_cb_t; // recover complete subsystem callbacks;
typedef std::map< uint64_t, meta_blk* > meta_blk_map_t; // blkid to meta_blk map;
typedef std::map< uint64_t, meta_blk_ovf_hdr* > ovf_hdr_map_t; // ovf_blkid to ovf_blk_hdr map;
typedef std::map< meta_sub_type, MetaSubRegInfo > client_info_map_t; // client information map;
typedef std::unordered_map< meta_sub_type, std::vector< meta_sub_type > > subtype_graph_t;

class MetablkMetrics : public sisl::MetricsGroupWrapper {
public:
Expand Down Expand Up @@ -87,6 +91,7 @@ class MetaBlkService {
MetablkMetrics m_metrics;
bool m_inited{false};
std::unique_ptr< meta_vdev_context > m_meta_vdev_context;
subtype_graph_t m_dep_topo_graph;

public:
MetaBlkService(const char* name = "MetaBlkStore");
Expand Down Expand Up @@ -121,7 +126,7 @@ class MetaBlkService {
* @param cb : subsystem cb
*/
void register_handler(meta_sub_type type, const meta_blk_found_cb_t& cb, const meta_blk_recover_comp_cb_t& comp_cb,
bool do_crc = true);
bool do_crc = true, std::optional< meta_subtype_vec_t > deps = std::nullopt);

/**
* @brief
Expand Down Expand Up @@ -357,6 +362,9 @@ class MetaBlkService {
bool scan_and_load_meta_blks(meta_blk_map_t& meta_blks, ovf_hdr_map_t& ovf_blk_hdrs, BlkId* last_mblk_id,
client_info_map_t& sub_info);

void recover_meta_block(meta_blk* meta_block);
void recover_meta_sub_type(bool do_comp_cb, const meta_sub_type&);

public:
bool verify_metablk_store();

Expand Down
2 changes: 1 addition & 1 deletion src/lib/blkdata_svc/blk_read_tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class BlkReadTracker {
/**
* @brief : decrease the reference count of the BlkId by 1 in this read tracker.
* If the ref count drops to zero, it means no read is pending on this blkid and if there is a waiter on this blkid,
* callback should be trigggered and all entries associated with this blkid (there could be more than one
* callback should be triggered and all entries associated with this blkid (there could be more than one
* sub_ranges) should be removed.
*
* @param blkid : blkid that is being dereferneced;
Expand Down
36 changes: 36 additions & 0 deletions src/lib/common/homestore_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,41 @@ sisl::byte_array hs_utils::extract_byte_array(const sisl::byte_view& b, const bo
return (is_aligned_needed) ? b.extract(alignment) : b.extract(0);
};

bool hs_utils::topological_sort(std::unordered_map< std::string, std::vector< std::string > >& DAG,
std::vector< std::string >& ordered_entries) {
std::unordered_map< std::string, int > in_degree;
std::queue< std::string > q;

// Calculate in-degree of each vertex
for (const auto& [vertex, edges] : DAG) {
// we should make sure all the vertex in in_degree map;
// if vertex is not in the map, 0 will be assigned.
in_degree[vertex];
for (const auto& edge : edges) {
in_degree[edge]++;
}
}

// Add vertices with in-degree 0 to the queue
for (const auto& [vertex, degree] : in_degree) {
if (degree == 0) q.push(vertex);
}

// Process vertices in the queue
while (!q.empty()) {
const auto vertex = q.front();
q.pop();
ordered_entries.push_back(vertex);

for (const auto& edge : DAG[vertex]) {
in_degree[edge]--;
if (in_degree[edge] == 0) { q.push(edge); }
}
}

// Check for cycle
return ordered_entries.size() != DAG.size();
}

size_t hs_utils::m_btree_mempool_size;
} // namespace homestore
8 changes: 8 additions & 0 deletions src/lib/common/homestore_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,13 @@ class hs_utils {
static sisl::byte_array make_byte_array(const uint64_t size, const bool is_aligned_needed, const sisl::buftag tag,
const size_t alignment);
static uuid_t gen_random_uuid();

/**
* @brief given a DAG graph , build the partial order sequence.
*
* @return true if the DAG has a circle ,or false if not.
*/
static bool topological_sort(std::unordered_map< std::string, std::vector< std::string > >& DAG,
std::vector< std::string >& ordered_entries);
};
} // namespace homestore
8 changes: 4 additions & 4 deletions src/lib/homestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,16 +226,16 @@ void HomeStore::do_start() {
void HomeStore::init_done() { m_init_done = true; }

void HomeStore::shutdown() {
if (!m_init_done) {
if (!m_init_done) {
LOGWARN("Homestore shutdown is called before init is completed");
return;
return;
}

LOGINFO("Homestore shutdown is started");

if (has_index_service()) {
m_index_service->stop();
// m_index_service.reset();
// m_index_service.reset();
}

if (has_log_service()) {
Expand Down
165 changes: 96 additions & 69 deletions src/lib/meta/meta_blk_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,16 @@ void MetaBlkService::deregister_handler(meta_sub_type type) {
const auto it = m_sub_info.find(type);
if (it != std::end(m_sub_info)) {
m_sub_info.erase(it);
m_dep_topo_graph.erase(type);
HS_LOG(INFO, metablk, "[type={}] deregistered Successfully", type);
} else {
HS_LOG(INFO, metablk, "[type={}] not found in registered list, no-op", type);
}
}

void MetaBlkService::register_handler(meta_sub_type type, const meta_blk_found_cb_t& cb,
const meta_blk_recover_comp_cb_t& comp_cb, bool do_crc) {
const meta_blk_recover_comp_cb_t& comp_cb, bool do_crc,
std::optional< meta_subtype_vec_t > deps) {
std::lock_guard< decltype(m_meta_mtx) > lk(m_meta_mtx);
HS_REL_ASSERT_LT(type.length(), MAX_SUBSYS_TYPE_LEN, "type len: {} should not exceed len: {}", type.length(),
MAX_SUBSYS_TYPE_LEN);
Expand All @@ -375,7 +377,13 @@ void MetaBlkService::register_handler(meta_sub_type type, const meta_blk_found_c
m_sub_info[type].cb = cb;
m_sub_info[type].comp_cb = comp_cb;
m_sub_info[type].do_crc = do_crc ? 1 : 0;
HS_LOG(INFO, metablk, "[type={}] registered with do_crc: {}", type, do_crc);
if (deps.has_value()) {
m_sub_info[type].has_deps = true;
for (auto const& x : deps.value()) {
m_sub_info[x].has_deps = true;
m_dep_topo_graph[x].push_back(type);
}
}
}

void MetaBlkService::add_sub_sb(meta_sub_type type, const uint8_t* context_data, uint64_t sz, void*& cookie) {
Expand Down Expand Up @@ -1074,84 +1082,103 @@ sisl::byte_array MetaBlkService::read_sub_sb_internal(const meta_blk* mblk) cons
void MetaBlkService::recover(bool do_comp_cb) {
// for each registered subsystem, look up in cache for their meta blks;
std::lock_guard< decltype(m_shutdown_mtx) > lg{m_shutdown_mtx};
for (auto& m : m_meta_blks) {
auto* mblk = m.second;
auto buf = read_sub_sb_internal(mblk);

// found a meta blk and callback to sub system;
const auto itr = m_sub_info.find(mblk->hdr.h.type);
if (itr != std::end(m_sub_info)) {
// if subsystem registered crc protection, verify crc before sending to subsystem;
if (itr->second.do_crc) {
const auto crc = crc32_ieee(init_crc32, s_cast< const uint8_t* >(buf->bytes), mblk->hdr.h.context_sz);

HS_REL_ASSERT_EQ(crc, uint32_cast(mblk->hdr.h.crc),
"[type={}], CRC mismatch: {}/{}, on mblk bid: {}, context_sz: {}", mblk->hdr.h.type,
crc, uint32_cast(mblk->hdr.h.crc), mblk->hdr.h.bid.to_string(),
uint64_cast(mblk->hdr.h.context_sz));
} else {
HS_LOG(DEBUG, metablk, "[type={}] meta blk found with bypassing crc.", mblk->hdr.h.type);
}
meta_subtype_vec_t ordered_subtypes;

// send the callbck;
auto& cb = itr->second.cb;
if (cb) { // cb could be nullptr because client want to get its superblock via read api;
// decompress if necessary
if (mblk->hdr.h.compressed) {
// HS_DBG_ASSERT_GE(mblk->hdr.h.context_sz, META_BLK_CONTEXT_SZ);
// TO DO: Might need to address alignment based on data or fast type
auto decompressed_buf{hs_utils::make_byte_array(mblk->hdr.h.src_context_sz, true /* aligned */,
sisl::buftag::compression, align_size())};
size_t decompressed_size = mblk->hdr.h.src_context_sz;
const auto ret{sisl::Compress::decompress(r_cast< const char* >(buf->bytes),
r_cast< char* >(decompressed_buf->bytes),
mblk->hdr.h.compressed_sz, &decompressed_size)};
if (ret != 0) {
LOGERROR("[type={}], negative result: {} from decompress trying to decompress the "
"data. compressed_sz: {}, src_context_sz: {}",
mblk->hdr.h.type, ret, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz));
HS_REL_ASSERT(false, "failed to decompress");
} else {
// decompressed_size must equal to input sz before compress
HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz),
uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it
is safe to cast to uint64_t */
HS_LOG(DEBUG, metablk,
"[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, "
"decompressed_size: {}",
mblk->hdr.h.type, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz), decompressed_size);
}
if (hs_utils::topological_sort(m_dep_topo_graph, ordered_subtypes)) {
throw homestore::homestore_exception(
"MetaBlkService has circular dependency, please check the dependency graph", homestore_error::init_failed);
}

// all the subsystems are divided into two parts.
// for subsystems in ordered_subtypes, we need to recover in order.
for (const auto& subtype : ordered_subtypes) {
recover_meta_sub_type(do_comp_cb, subtype);
}

// TODO: for independent subsystems, we can use concurrent recovery if necessary.
for (auto const& x : m_sub_info) {
auto& reg_info = x.second;
if (!reg_info.has_deps) { recover_meta_sub_type(do_comp_cb, x.first); }
}
}

void MetaBlkService::recover_meta_sub_type(bool do_comp_cb, const meta_sub_type& sub_type) {
for (const auto& m : m_sub_info[sub_type].meta_bids) {
auto mblk = m_meta_blks[m];
recover_meta_block(mblk);
}

if (do_comp_cb && m_sub_info[sub_type].comp_cb) {
m_sub_info[sub_type].comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", sub_type);
}
}

void MetaBlkService::recover_meta_block(meta_blk* mblk) {
auto buf = read_sub_sb_internal(mblk);
// found a meta blk and callback to sub system;
const auto itr = m_sub_info.find(mblk->hdr.h.type);
if (itr != std::end(m_sub_info)) {
// if subsystem registered crc protection, verify crc before sending to subsystem;
if (itr->second.do_crc) {
const auto crc = crc32_ieee(init_crc32, s_cast< const uint8_t* >(buf->bytes), mblk->hdr.h.context_sz);

HS_REL_ASSERT_EQ(crc, uint32_cast(mblk->hdr.h.crc),
"[type={}], CRC mismatch: {}/{}, on mblk bid: {}, context_sz: {}", mblk->hdr.h.type, crc,
uint32_cast(mblk->hdr.h.crc), mblk->hdr.h.bid.to_string(),
uint64_cast(mblk->hdr.h.context_sz));
} else {
HS_LOG(DEBUG, metablk, "[type={}] meta blk found with bypassing crc.", mblk->hdr.h.type);
}

cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz);
// send the callbck;
auto& cb = itr->second.cb;
if (cb) { // cb could be nullptr because client want to get its superblock via read api;
// decompress if necessary
if (mblk->hdr.h.compressed) {
// HS_DBG_ASSERT_GE(mblk->hdr.h.context_sz, META_BLK_CONTEXT_SZ);
// TO DO: Might need to address alignment based on data or fast type
auto decompressed_buf{hs_utils::make_byte_array(mblk->hdr.h.src_context_sz, true /* aligned */,
sisl::buftag::compression, align_size())};
size_t decompressed_size = mblk->hdr.h.src_context_sz;
const auto ret{sisl::Compress::decompress(r_cast< const char* >(buf->bytes),
r_cast< char* >(decompressed_buf->bytes),
mblk->hdr.h.compressed_sz, &decompressed_size)};
if (ret != 0) {
LOGERROR("[type={}], negative result: {} from decompress trying to decompress the "
"data. compressed_sz: {}, src_context_sz: {}",
mblk->hdr.h.type, ret, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz));
HS_REL_ASSERT(false, "failed to decompress");
} else {
// There is use case that cb could be nullptr because client want to get its superblock via
// read api;
cb(mblk, buf, mblk->hdr.h.context_sz);
// decompressed_size must equal to input sz before compress
HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz),
uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it
is safe to cast to uint64_t */
HS_LOG(DEBUG, metablk,
"[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, "
"decompressed_size: {}",
mblk->hdr.h.type, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz), decompressed_size);
}

HS_LOG(DEBUG, metablk, "[type={}] meta blk sent with size: {}.", mblk->hdr.h.type,
uint64_cast(mblk->hdr.h.context_sz));
cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz);
} else {
// There is use case that cb could be nullptr because client want to get its superblock via
// read api;
cb(mblk, buf, mblk->hdr.h.context_sz);
}
} else {
HS_LOG(DEBUG, metablk, "[type={}], unregistered client found. ");

HS_LOG(DEBUG, metablk, "[type={}] meta blk sent with size: {}.", mblk->hdr.h.type,
uint64_cast(mblk->hdr.h.context_sz));
}
} else {
HS_LOG(DEBUG, metablk, "[type={}], unregistered client found. ");
#if 0
// should never arrive here since we do assert on type before write to disk;
HS_LOG_ASSERT( false, "[type={}] not registered for mblk found on disk. Skip this meta blk. ",
mblk->hdr.h.type);
#endif
}
}

if (do_comp_cb) {
// for each registered subsystem, do recovery complete callback;
for (auto& sub : m_sub_info) {
if (sub.second.comp_cb) {
sub.second.comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", sub.first);
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/meta/meta_sb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ struct MetaSubRegInfo {
std::set< uint64_t > meta_bids; // meta blk id
meta_blk_found_cb_t cb{nullptr};
meta_blk_recover_comp_cb_t comp_cb{nullptr};
bool has_deps{false};
};

// meta blk super block put as 1st block in the block chain;
Expand Down
Loading

0 comments on commit 3911682

Please sign in to comment.