Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Nov 1, 2023
1 parent 1e0c2bc commit a0ea4bb
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 65 deletions.
2 changes: 2 additions & 0 deletions src/include/homestore/meta_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ class MetaBlkService {
bool scan_and_load_meta_blks(meta_blk_map_t& meta_blks, ovf_hdr_map_t& ovf_blk_hdrs, BlkId* last_mblk_id,
client_info_map_t& sub_info);

void recover_meta_block(meta_blk* meta_block);

public:
bool verify_metablk_store();

Expand Down
170 changes: 105 additions & 65 deletions src/lib/meta/meta_blk_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1081,83 +1081,123 @@ sisl::byte_array MetaBlkService::read_sub_sb_internal(const meta_blk* mblk) cons
void MetaBlkService::recover(bool do_comp_cb) {
// for each registered subsystem, look up in cache for their meta blks;
std::lock_guard< decltype(m_shutdown_mtx) > lg{m_shutdown_mtx};
const auto& subtype_recovery_order = topological_sort(m_dependency_topological_graph);
for (const auto& subtype : subtype_recovery_order) {

meta_subtype_vec_t independent_subsystems;
std::unordered_set< meta_sub_type > visited;

for (const auto& [type, dependencies] : m_dependency_topological_graph) {
// get all the vertex whose indegree is not 0
for (const auto& dependency : dependencies) {
visited.insert(dependency);
}
}

for (const auto& [type, dependencies] : m_dependency_topological_graph) {
// outdegree == 0 and indegree == 0 means no dependency
if (dependencies.empty() && visited.find(type) == visited.end()) { independent_subsystems.push_back(type); }
}

for (const auto& independent_subsystem : independent_subsystems) {
m_dependency_topological_graph.erase(independent_subsystem);
}

// all the subsystems are divided into two parts.
// for subsystems in m_dependency_topological_graph, we need to do topological sort and recover in order.

const auto& dependent_subtype_recovery_order = topological_sort(m_dependency_topological_graph);
for (const auto& subtype : dependent_subtype_recovery_order) {
for (const auto& m : m_sub_info[subtype].meta_bids) {
auto mblk = m_meta_blks[m];
auto buf = read_sub_sb_internal(mblk);

// found a meta blk and callback to sub system;
const auto itr = m_sub_info.find(mblk->hdr.h.type);
if (itr != std::end(m_sub_info)) {
// if subsystem registered crc protection, verify crc before sending to subsystem;
if (itr->second.do_crc) {
const auto crc =
crc32_ieee(init_crc32, s_cast< const uint8_t* >(buf->bytes), mblk->hdr.h.context_sz);

HS_REL_ASSERT_EQ(crc, uint32_cast(mblk->hdr.h.crc),
"[type={}], CRC mismatch: {}/{}, on mblk bid: {}, context_sz: {}",
mblk->hdr.h.type, crc, uint32_cast(mblk->hdr.h.crc), mblk->hdr.h.bid.to_string(),
uint64_cast(mblk->hdr.h.context_sz));
} else {
HS_LOG(DEBUG, metablk, "[type={}] meta blk found with bypassing crc.", mblk->hdr.h.type);
}
recover_meta_block(mblk);
}

// send the callbck;
auto& cb = itr->second.cb;
if (cb) { // cb could be nullptr because client want to get its superblock via read api;
// decompress if necessary
if (mblk->hdr.h.compressed) {
// HS_DBG_ASSERT_GE(mblk->hdr.h.context_sz, META_BLK_CONTEXT_SZ);
// TO DO: Might need to address alignment based on data or fast type
auto decompressed_buf{hs_utils::make_byte_array(mblk->hdr.h.src_context_sz, true /* aligned */,
sisl::buftag::compression, align_size())};
size_t decompressed_size = mblk->hdr.h.src_context_sz;
const auto ret{sisl::Compress::decompress(r_cast< const char* >(buf->bytes),
r_cast< char* >(decompressed_buf->bytes),
mblk->hdr.h.compressed_sz, &decompressed_size)};
if (ret != 0) {
LOGERROR("[type={}], negative result: {} from decompress trying to decompress the "
"data. compressed_sz: {}, src_context_sz: {}",
mblk->hdr.h.type, ret, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz));
HS_REL_ASSERT(false, "failed to decompress");
} else {
// decompressed_size must equal to input sz before compress
HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz),
uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it
is safe to cast to uint64_t */
HS_LOG(DEBUG, metablk,
"[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, "
"decompressed_size: {}",
mblk->hdr.h.type, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz), decompressed_size);
}
if (do_comp_cb && m_sub_info[subtype].comp_cb) {
m_sub_info[subtype].comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", subtype);
}
}

cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz);
} else {
// There is use case that cb could be nullptr because client want to get its superblock via
// read api;
cb(mblk, buf, mblk->hdr.h.context_sz);
}
// for subsystems in independent_subsystems, we can use concurrent recovery if necessary.
// for now, we recover independent_subsystems in current thread.
// we can start a thread to recover independent_subsystems concurrently to improve performace in the future.
for (const auto& subtype : independent_subsystems) {
for (const auto& m : m_sub_info[subtype].meta_bids) {
auto mblk = m_meta_blks[m];
recover_meta_block(mblk);
}

HS_LOG(DEBUG, metablk, "[type={}] meta blk sent with size: {}.", mblk->hdr.h.type,
uint64_cast(mblk->hdr.h.context_sz));
if (do_comp_cb && m_sub_info[subtype].comp_cb) {
m_sub_info[subtype].comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", subtype);
}
}
}

void MetaBlkService::recover_meta_block(meta_blk* mblk) {
auto buf = read_sub_sb_internal(mblk);
// found a meta blk and callback to sub system;
const auto itr = m_sub_info.find(mblk->hdr.h.type);
if (itr != std::end(m_sub_info)) {
// if subsystem registered crc protection, verify crc before sending to subsystem;
if (itr->second.do_crc) {
const auto crc = crc32_ieee(init_crc32, s_cast< const uint8_t* >(buf->bytes), mblk->hdr.h.context_sz);

HS_REL_ASSERT_EQ(crc, uint32_cast(mblk->hdr.h.crc),
"[type={}], CRC mismatch: {}/{}, on mblk bid: {}, context_sz: {}", mblk->hdr.h.type, crc,
uint32_cast(mblk->hdr.h.crc), mblk->hdr.h.bid.to_string(),
uint64_cast(mblk->hdr.h.context_sz));
} else {
HS_LOG(DEBUG, metablk, "[type={}] meta blk found with bypassing crc.", mblk->hdr.h.type);
}

// send the callbck;
auto& cb = itr->second.cb;
if (cb) { // cb could be nullptr because client want to get its superblock via read api;
// decompress if necessary
if (mblk->hdr.h.compressed) {
// HS_DBG_ASSERT_GE(mblk->hdr.h.context_sz, META_BLK_CONTEXT_SZ);
// TO DO: Might need to address alignment based on data or fast type
auto decompressed_buf{hs_utils::make_byte_array(mblk->hdr.h.src_context_sz, true /* aligned */,
sisl::buftag::compression, align_size())};
size_t decompressed_size = mblk->hdr.h.src_context_sz;
const auto ret{sisl::Compress::decompress(r_cast< const char* >(buf->bytes),
r_cast< char* >(decompressed_buf->bytes),
mblk->hdr.h.compressed_sz, &decompressed_size)};
if (ret != 0) {
LOGERROR("[type={}], negative result: {} from decompress trying to decompress the "
"data. compressed_sz: {}, src_context_sz: {}",
mblk->hdr.h.type, ret, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz));
HS_REL_ASSERT(false, "failed to decompress");
} else {
// decompressed_size must equal to input sz before compress
HS_REL_ASSERT_EQ(uint64_cast(mblk->hdr.h.src_context_sz),
uint64_cast(decompressed_size)); /* since decompressed_size is >=0 it
is safe to cast to uint64_t */
HS_LOG(DEBUG, metablk,
"[type={}] Successfully decompressed, compressed_sz: {}, src_context_sz: {}, "
"decompressed_size: {}",
mblk->hdr.h.type, uint64_cast(mblk->hdr.h.compressed_sz),
uint64_cast(mblk->hdr.h.src_context_sz), decompressed_size);
}

cb(mblk, decompressed_buf, mblk->hdr.h.src_context_sz);
} else {
HS_LOG(DEBUG, metablk, "[type={}], unregistered client found. ");
// There is use case that cb could be nullptr because client want to get its superblock via
// read api;
cb(mblk, buf, mblk->hdr.h.context_sz);
}

HS_LOG(DEBUG, metablk, "[type={}] meta blk sent with size: {}.", mblk->hdr.h.type,
uint64_cast(mblk->hdr.h.context_sz));
}
} else {
HS_LOG(DEBUG, metablk, "[type={}], unregistered client found. ");
#if 0
// should never arrive here since we do assert on type before write to disk;
HS_LOG_ASSERT( false, "[type={}] not registered for mblk found on disk. Skip this meta blk. ",
mblk->hdr.h.type);
#endif
}
}

if (do_comp_cb && m_sub_info[subtype].comp_cb) {
m_sub_info[subtype].comp_cb(true);
HS_LOG(DEBUG, metablk, "[type={}] completion callback sent.", subtype);
}
}
}

Expand Down

0 comments on commit a0ea4bb

Please sign in to comment.