Skip to content

Commit

Permalink
using topological sort to reorder the recovery of metaservice
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Oct 24, 2023
1 parent 202f07f commit 4434840
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 73 deletions.
6 changes: 5 additions & 1 deletion src/include/homestore/meta_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstdint>
#include <functional>
#include <map>
#include <unordered_map>
#include <memory>
#include <mutex>
#include <string>
Expand Down Expand Up @@ -87,6 +88,7 @@ class MetaBlkService {
MetablkMetrics m_metrics;
bool m_inited{false};
std::unique_ptr< meta_vdev_context > m_meta_vdev_context;
std::unordered_map< meta_sub_type, std::vector< meta_sub_type > > m_dependency_topological_graph;

public:
MetaBlkService(const char* name = "MetaBlkStore");
Expand Down Expand Up @@ -121,7 +123,7 @@ class MetaBlkService {
* @param cb : subsystem cb
*/
void register_handler(meta_sub_type type, const meta_blk_found_cb_t& cb, const meta_blk_recover_comp_cb_t& comp_cb,
bool do_crc = true);
bool do_crc = true, std::vector< meta_sub_type > dependencies = {});

/**
* @brief
Expand Down Expand Up @@ -332,6 +334,8 @@ class MetaBlkService {
uint64_t max_compress_memory_size() const;
uint64_t init_compress_memory_size() const;

std::vector< meta_sub_type > topological_sort(std::unordered_map< meta_sub_type, std::vector< meta_sub_type > >);

public:
bool get_skip_hdr_check() const;

Expand Down
2 changes: 1 addition & 1 deletion src/lib/blkdata_svc/blk_read_tracker.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class BlkReadTracker {
/**
* @brief : decrease the reference count of the BlkId by 1 in this read tracker.
* If the ref count drops to zero, it means no read is pending on this blkid and if there is a waiter on this blkid,
* callback should be trigggered and all entries associated with this blkid (there could be more than one
* callback should be triggered and all entries associated with this blkid (there could be more than one
* sub_ranges) should be removed.
*
* @param blkid : blkid that is being dereferneced;
Expand Down
173 changes: 108 additions & 65 deletions src/lib/meta/meta_blk_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,14 +351,16 @@ void MetaBlkService::deregister_handler(meta_sub_type type) {
const auto it = m_sub_info.find(type);
if (it != std::end(m_sub_info)) {
m_sub_info.erase(it);
m_dependency_topological_graph.erase(type);
HS_LOG(INFO, metablk, "[type={}] deregistered Successfully", type);
} else {
HS_LOG(INFO, metablk, "[type={}] not found in registered list, no-op", type);
}
}

void MetaBlkService::register_handler(meta_sub_type type, const meta_blk_found_cb_t& cb,
const meta_blk_recover_comp_cb_t& comp_cb, bool do_crc) {
const meta_blk_recover_comp_cb_t& comp_cb, bool do_crc,
std::vector< meta_sub_type > dependencies) {
std::lock_guard< decltype(m_meta_mtx) > lk(m_meta_mtx);
HS_REL_ASSERT_LT(type.length(), MAX_SUBSYS_TYPE_LEN, "type len: {} should not exceed len: {}", type.length(),
MAX_SUBSYS_TYPE_LEN);
Expand All @@ -375,6 +377,7 @@ void MetaBlkService::register_handler(meta_sub_type type, const meta_blk_found_c
m_sub_info[type].cb = cb;
m_sub_info[type].comp_cb = comp_cb;
m_sub_info[type].do_crc = do_crc ? 1 : 0;
m_dependency_topological_graph[type] = dependencies;
HS_LOG(INFO, metablk, "[type={}] registered with do_crc: {}", type, do_crc);
}

Expand Down Expand Up @@ -1074,85 +1077,125 @@ sisl::byte_array MetaBlkService::read_sub_sb_internal(const meta_blk* mblk) cons
void MetaBlkService::recover(bool do_comp_cb) {
// for each registered subsystem, look up in cache for their meta blks;
std::lock_guard< decltype(m_shutdown_mtx) > lg{m_shutdown_mtx};
for (auto& m : m_meta_blks) {
auto* mblk = m.second;
auto buf = read_sub_sb_internal(mblk);

// found a meta blk and callback to sub system;
const auto itr = m_sub_info.find(mblk->hdr.h.type);
if (itr != std::end(m_sub_info)) {
// if subsystem registered crc protection, verify crc before sending to subsystem;
if (itr->second.do_crc) {
const auto crc = crc32_ieee(init_crc32, s_cast< const uint8_t* >(buf->bytes), mblk->hdr.h.context_sz);

HS_REL_ASSERT_EQ(crc, uint32_cast(mblk->hdr.h.crc),
"[type={}], CRC mismatch: {}/{}, on mblk bid: {}, context_sz: {}", mblk->hdr.h.type,
crc, uint32_cast(mblk->hdr.h.crc), mblk->hdr.h.bid.to_string(),
uint64_cast(mblk->hdr.h.context_sz));
} else {
HS_LOG(DEBUG, metablk, "[type={}] meta blk found with bypassing crc.", mblk->hdr.h.type);
}
const auto& subtype_recovery_order = topological_sort(m_dependency_topological_graph);
for (const auto& subtype : subtype_recovery_order) {
for (const auto& m : m_sub_info[subtype].meta_bids) {
auto mblk = m_meta_blks[m];
auto buf = read_sub_sb_internal(mblk);

// found a meta blk and callback to sub system;
const auto itr = m_sub_info.find(mblk->hdr.h.type);
if (itr != std::end(m_sub_info)) {
// if subsystem registered crc protection, verify crc before sending to subsystem;
if (itr->second.do_crc) {
const auto crc =
crc32_ieee(init_crc32, s_cast< const uint8_t* >(buf->bytes), mblk->hdr.h.context_sz);

HS_REL_ASSERT_EQ(crc, uint32_cast(mblk->hdr.h.crc),
"[type={}], CRC mismatch: {}/{}, on mblk bid: {}, context_sz: {}",
mblk->hdr.h.type, crc, uint32_cast(mblk->hdr.h.crc), mblk->hdr.h.bid.to_string(),
uint64_cast(mblk->hdr.h.context_sz));
} else {
HS_LOG(DEBUG, metablk, "[type={}] meta blk found with bypassing crc.", mblk->hdr.h.type);
}

// send the callbck;
auto& cb = itr->second.cb;
if (cb) { // cb could be nullptr because client want to get its superblock via read api;
// decompress if necessary
if (mblk->hdr.h.compressed) {
// HS_DBG_ASSERT_GE(mblk->hdr.h.context_sz, META_BLK_CONTEXT_SZ);
// TO DO: Might need to address alignment based on data or fast type
auto decompressed_buf{hs_utils::make_byte_array(mblk->hdr.h.src_context_sz, true /* aligned */,
sisl::buftag::compression, align_size())};
size_t decompressed_size = mblk->hdr.h.src_context_sz;
const auto ret{sisl::Compress::decompress(r_cast< const char* >(buf->bytes),
r_cast< char* >(decompressed_buf->bytes),
mblk->hdr.h.compressed_sz, &decompressed_size)};
if (ret != 0) {
LOGERROR("[type={}], negative result: {} from decompress trying to decompress the "
"data. compressed_sz: {}, src_context_sz: {}",
mblk->hdr.h.type, ret, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz));
HS_REL_ASSERT(false, "failed to decompress");
// send the callbck;
auto& cb = itr->second.cb;
if (cb) { // cb could be nullptr because client want to get its superblock via read api;
// decompress if necessary
if (mblk->hdr.h.compressed) {
// HS_DBG_ASSERT_GE(mblk->hdr.h.context_sz, META_BLK_CONTEXT_SZ);
// TO DO: Might need to address alignment based on data or fast type
auto decompressed_buf{hs_utils::make_byte_array(mblk->hdr.h.src_context_sz, true /* aligned */,
sisl::buftag::compression, align_size())};
size_t decompressed_size = mblk->hdr.h.src_context_sz;
const auto ret{sisl::Compress::decompress(r_cast< const char* >(buf->bytes),
r_cast< char* >(decompressed_buf->bytes),
mblk->hdr.h.compressed_sz, &decompressed_size)};
if (ret != 0) {
LOGERROR("[type={}], negative result: {} from decompress trying to decompress the "
"data. compressed_sz: {}, src_context_sz: {}",
mblk->hdr.h.type, ret, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz));
HS_REL_ASSERT(false, "failed to decompress");
} else {
// decompressed_size must equal to input sz before compress
HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz),
uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it
is safe to cast to uint64_t */
HS_LOG(DEBUG, metablk,
"[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, "
"decompressed_size: {}",
mblk->hdr.h.type, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz), decompressed_size);
}

cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz);
} else {
// decompressed_size must equal to input sz before compress
HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz),
uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it
is safe to cast to uint64_t */
HS_LOG(DEBUG, metablk,
"[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, "
"decompressed_size: {}",
mblk->hdr.h.type, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz), decompressed_size);
// There is use case that cb could be nullptr because client want to get its superblock via
// read api;
cb(mblk, buf, mblk->hdr.h.context_sz);
}

cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz);
} else {
// There is use case that cb could be nullptr because client want to get its superblock via
// read api;
cb(mblk, buf, mblk->hdr.h.context_sz);
HS_LOG(DEBUG, metablk, "[type={}] meta blk sent with size: {}.", mblk->hdr.h.type,
uint64_cast(mblk->hdr.h.context_sz));
}

HS_LOG(DEBUG, metablk, "[type={}] meta blk sent with size: {}.", mblk->hdr.h.type,
uint64_cast(mblk->hdr.h.context_sz));
}
} else {
HS_LOG(DEBUG, metablk, "[type={}], unregistered client found. ");
} else {
HS_LOG(DEBUG, metablk, "[type={}], unregistered client found. ");
#if 0
// should never arrive here since we do assert on type before write to disk;
HS_LOG_ASSERT( false, "[type={}] not registered for mblk found on disk. Skip this meta blk. ",
mblk->hdr.h.type);
#endif
}
}

if (do_comp_cb && m_sub_info[subtype].comp_cb) {
m_sub_info[subtype].comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", subtype);
}
}
}

if (do_comp_cb) {
// for each registered subsystem, do recovery complete callback;
for (auto& sub : m_sub_info) {
if (sub.second.comp_cb) {
sub.second.comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", sub.first);
}
std::vector< meta_sub_type >
MetaBlkService::topological_sort(std::unordered_map< meta_sub_type, std::vector< meta_sub_type > > topological_graph) {
std::vector< meta_sub_type > result;
std::unordered_map< meta_sub_type, int > in_degree;
std::queue< meta_sub_type > q;

// Calculate in-degree of each vertex
for (const auto& [vertex, edges] : topological_graph) {
in_degree[vertex] = 0;
}
for (const auto& [vertex, edges] : topological_graph) {
for (const auto& edge : edges) {
in_degree[edge]++;
}
}

// Add vertices with in-degree 0 to the queue
for (const auto& [vertex, degree] : in_degree) {
if (degree == 0) { q.push(vertex); }
}

// Process vertices in the queue
while (!q.empty()) {
const auto vertex = q.front();
q.pop();
result.push_back(vertex);

for (const auto& edge : topological_graph[vertex]) {
in_degree[edge]--;
if (in_degree[edge] == 0) { q.push(edge); }
}
}

// Check for cycle
if (result.size() != topological_graph.size()) {
throw std::runtime_error("Topological sort failed: cycle detected");
}

return result;
}

//
Expand Down
Loading

0 comments on commit 4434840

Please sign in to comment.