Skip to content

Commit

Permalink
Begin back-porting LogDev::rollback() from 4.x
Browse files Browse the repository at this point in the history
  • Loading branch information
szmyd committed Dec 18, 2024
1 parent 2ca8fcc commit 75c50c7
Show file tree
Hide file tree
Showing 4 changed files with 301 additions and 44 deletions.
212 changes: 178 additions & 34 deletions src/homelogstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ SISL_LOGGING_DECL(logstore)
#define THIS_LOGDEV_PERIODIC_LOG(level, msg, ...) \
HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_family_id, , , msg, __VA_ARGS__)

LogDev::LogDev(const logstore_family_id_t f_id, const std::string& metablk_name) :
m_family_id{f_id}, m_logdev_meta{metablk_name} {
LogDev::LogDev(const logstore_family_id_t f_id, const std::string& logdev_name) :
m_family_id{f_id}, m_logdev_meta{logdev_name} {
m_flush_size_multiple = 0;
if (f_id == HomeLogStoreMgr::DATA_LOG_FAMILY_IDX) {
m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_data_logdev);
Expand All @@ -47,9 +47,7 @@ LogDev::LogDev(const logstore_family_id_t f_id, const std::string& metablk_name)

LogDev::~LogDev() = default;

void LogDev::meta_blk_found(meta_blk* const mblk, const sisl::byte_view buf, const size_t size) {
m_logdev_meta.meta_buf_found(buf, static_cast< void* >(mblk));
}
void LogDev::meta_blk_found(meta_blk* const, const sisl::byte_view, const size_t) {}

uint32_t LogDev::get_align_size() const { return m_blkstore->get_align_size(); }

Expand Down Expand Up @@ -182,10 +180,17 @@ void LogDev::do_load(const off_t device_cursor) {
b.set_size(rec->size);
if (m_last_truncate_idx == -1) { m_last_truncate_idx = header->start_idx() + i; }
if (m_logfound_cb) {
THIS_LOGDEV_LOG(TRACE, "seq num {}, log indx {}, group dev offset {} size {}", rec->store_seq_num,
(header->start_idx() + i), group_dev_offset, rec->size);
m_logfound_cb(rec->store_id, rec->store_seq_num, {header->start_idx() + i, group_dev_offset},
flush_ld_key, b, (header->nrecords() - (i + 1)));
// Validate if the id is present in rollback info
if (m_logdev_meta.is_rolled_back(rec->store_id, header->start_idx() + i)) {
THIS_LOGDEV_LOG(
DEBUG, "logstore_id[{}] log_idx={}, lsn={} has been rolledback, not notifying the logstore",
rec->store_id, (header->start_idx() + i), rec->store_seq_num);
} else {
THIS_LOGDEV_LOG(TRACE, "seq num {}, log indx {}, group dev offset {} size {}", rec->store_seq_num,
(header->start_idx() + i), group_dev_offset, rec->size);
m_logfound_cb(rec->store_id, rec->store_seq_num, {header->start_idx() + i, group_dev_offset},
flush_ld_key, b, (header->nrecords() - (i + 1)));
}
}
++i;
}
Expand Down Expand Up @@ -585,6 +590,9 @@ uint64_t LogDev::truncate(const logdev_key& key) {
#endif
}

// We can remove the rollback records of those upto which logid is getting truncated
m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */);

m_logdev_meta.persist();
#ifdef _PRERELEASE
if (garbage_collect && homestore_flip->test_flip("logdev_abort_after_garbage")) {
Expand All @@ -602,6 +610,11 @@ void LogDev::update_store_superblk(const logstore_id_t idx, const logstore_super
m_logdev_meta.update_store_superblk(idx, meta, persist_now);
}

void LogDev::rollback(logstore_id_t store_id, logid_range_t id_range) {
std::unique_lock lg{m_meta_mutex};
m_logdev_meta.add_rollback_record(store_id, id_range, true);
}

sisl::status_response LogDev::get_status(const sisl::status_request& request) const {
sisl::status_response response;
response.json["current_log_idx"] = m_log_idx.load(std::memory_order_relaxed);
Expand All @@ -618,14 +631,33 @@ sisl::status_response LogDev::get_status(const sisl::status_request& request) co
}

/////////////////////////////// LogDevMetadata Section ///////////////////////////////////////
LogDevMetadata::LogDevMetadata(const std::string& metablk_name) : m_metablk_name{metablk_name} {}
LogDevMetadata::LogDevMetadata(const std::string& logdev_name) : m_metablk_name{logdev_name} {
MetaBlkMgrSI()->register_handler(
logdev_name + "_logdev_sb",
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) {
logdev_super_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr);

MetaBlkMgrSI()->register_handler(
logdev_name + "_rollback_sb",
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) {
rollback_super_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr);
}

logdev_superblk* LogDevMetadata::create() {
const auto req_sz{required_sb_size(0)};
auto req_sz{logdev_sb_size_needed(0)};
// TO DO: Might need to address alignment based on data or fast type
m_raw_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz), sisl::buftag::metablk,
MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_buf->bytes) logdev_superblk();
m_raw_logdev_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_logdev_buf->bytes) logdev_superblk();

req_sz = rollback_superblk::size_needed(1);
m_raw_rollback_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_rollback_sb = new (m_raw_rollback_buf->bytes) rollback_superblk();

logstore_superblk* const sb_area{m_sb->get_logstore_superblk()};
std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value());
Expand All @@ -636,22 +668,32 @@ logdev_superblk* LogDevMetadata::create() {
}

void LogDevMetadata::reset() {
m_raw_buf.reset();
m_raw_logdev_buf.reset();
m_sb = nullptr;
m_meta_mgr_cookie = nullptr;
m_logdev_cookie = nullptr;
m_id_reserver.reset();
m_store_info.clear();
}

void LogDevMetadata::meta_buf_found(const sisl::byte_view& buf, void* const meta_cookie) {
m_meta_mgr_cookie = meta_cookie;
m_raw_buf = hs_utils::extract_byte_array(buf, true, MetaBlkMgrSI()->get_align_size());
m_sb = reinterpret_cast< logdev_superblk* >(m_raw_buf->bytes);
void LogDevMetadata::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) {
m_logdev_cookie = meta_cookie;
m_raw_logdev_buf = hs_utils::extract_byte_array(buf, true, MetaBlkMgrSI()->get_align_size());
m_sb = reinterpret_cast< logdev_superblk* >(m_raw_logdev_buf->bytes);

HS_REL_ASSERT_EQ(m_sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch");
HS_REL_ASSERT_EQ(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk");
}

void LogDevMetadata::rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) {
m_rollback_cookie = meta_cookie;
m_raw_rollback_buf = hs_utils::extract_byte_array(buf, true, MetaBlkMgrSI()->get_align_size());
m_rollback_sb = reinterpret_cast< rollback_superblk* >(m_raw_rollback_buf->bytes);

HS_REL_ASSERT_EQ(m_rollback_sb->get_magic(), rollback_superblk::ROLLBACK_SB_MAGIC, "Rollback sb magic mismatch");
HS_REL_ASSERT_EQ(m_rollback_sb->get_version(), rollback_superblk::ROLLBACK_SB_VERSION,
"Rollback sb version mismatch");
}

std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::load() {
std::vector< std::pair< logstore_id_t, logstore_superblk > > ret_list;
ret_list.reserve(1024);
Expand All @@ -662,7 +704,7 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
m_id_reserver = std::make_unique< sisl::IDReserver >();
}

HS_REL_ASSERT_NE(m_raw_buf->bytes, nullptr, "Load called without getting metadata");
HS_REL_ASSERT_NE(m_raw_logdev_buf->bytes, nullptr, "Load called without getting metadata");
HS_REL_ASSERT_LE(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Logdev super blk version mismatch");

const logstore_superblk* const store_sb{m_sb->get_logstore_superblk()};
Expand All @@ -678,6 +720,11 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
++idx;
}

for (uint32_t i{0}; i < m_rollback_sb->num_records; ++i) {
const auto& rec = m_rollback_sb->at(i);
m_rollback_info.insert({rec.store_id, rec.idx_range});
}

return ret_list;
}

Expand All @@ -689,11 +736,23 @@ void LogDevMetadata::persist() {
}
#endif

if (m_meta_mgr_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_buf->bytes), m_raw_buf->size, m_meta_mgr_cookie);
if (m_logdev_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_logdev_buf->bytes), m_raw_logdev_buf->size,
m_logdev_cookie);
} else {
MetaBlkMgrSI()->add_sub_sb(m_metablk_name, static_cast< const void* >(m_raw_buf->bytes), m_raw_buf->size,
m_meta_mgr_cookie);
MetaBlkMgrSI()->add_sub_sb(m_metablk_name, static_cast< const void* >(m_raw_logdev_buf->bytes),
m_raw_logdev_buf->size, m_logdev_cookie);
}

if (m_rollback_info_dirty) {
if (m_rollback_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_rollback_buf->bytes),
m_raw_rollback_buf->size, m_rollback_cookie);
} else {
MetaBlkMgrSI()->add_sub_sb(m_metablk_name, static_cast< const void* >(m_raw_rollback_buf->bytes),
m_raw_rollback_buf->size, m_rollback_cookie);
}
m_rollback_info_dirty = false;
}
}

Expand All @@ -715,6 +774,7 @@ logstore_id_t LogDevMetadata::reserve_store(const bool persist_now) {
void LogDevMetadata::unreserve_store(const logstore_id_t idx, const bool persist_now) {
m_id_reserver->unreserve(idx);
m_store_info.erase(idx);
remove_all_rollback_records(idx, persist_now);

resize_if_needed();
if (idx < *m_store_info.rbegin()) {
Expand Down Expand Up @@ -761,29 +821,113 @@ void LogDevMetadata::set_start_dev_offset(const off_t offset, const logid_t key_
logid_t LogDevMetadata::get_start_log_idx() const { return m_sb->key_idx; }

bool LogDevMetadata::resize_if_needed() {
auto req_sz{required_sb_size((m_store_info.size() == 0) ? 0u : *m_store_info.rbegin() + 1)};
auto req_sz{logdev_sb_size_needed((m_store_info.size() == 0) ? 0u : *m_store_info.rbegin() + 1)};
if (MetaBlkMgrSI()->is_aligned_buf_needed(req_sz)) {
req_sz = sisl::round_up(req_sz, MetaBlkMgrSI()->get_align_size());
}
if (req_sz != m_raw_buf->size) {
const auto old_buf{m_raw_buf};
if (req_sz != m_raw_logdev_buf->size) {
const auto old_buf{m_raw_logdev_buf};
// TO DO: Might need to address alignment based on data or fast type
m_raw_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_buf->bytes) logdev_superblk();
m_raw_logdev_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_logdev_buf->bytes) logdev_superblk();

logstore_superblk* const sb_area{m_sb->get_logstore_superblk()};
std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value());

std::memcpy(static_cast< void* >(m_raw_buf->bytes), static_cast< const void* >(old_buf->bytes),
std::min(old_buf->size, m_raw_buf->size));
std::memcpy(static_cast< void* >(m_raw_logdev_buf->bytes), static_cast< const void* >(old_buf->bytes),
std::min(old_buf->size, m_raw_logdev_buf->size));
return true;
} else {
return false;
}
}

uint32_t LogDevMetadata::store_capacity() const {
return (m_raw_buf->size - sizeof(logdev_superblk)) / sizeof(logstore_superblk);
return (m_raw_logdev_buf->size - sizeof(logdev_superblk)) / sizeof(logstore_superblk);
}

void LogDevMetadata::add_rollback_record(logstore_id_t store_id, logid_range_t id_range, bool persist_now) {
m_rollback_info.insert({store_id, id_range});
resize_rollback_sb_if_needed();
m_rollback_sb->add_record(store_id, id_range);

if (persist_now) { persist(); }
m_rollback_info_dirty = !persist_now;
}

void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_now) {
uint32_t n_removed{0};
for (auto i = m_rollback_sb->num_records; i > 0; --i) {
auto& rec = m_rollback_sb->at(i - 1);
if (rec.idx_range.second <= upto_id) {
m_rollback_sb->remove_ith_record(i - 1);
++n_removed;
}
}

if (n_removed) {
for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) {
if (it->second.second <= upto_id) {
it = m_rollback_info.erase(it);
} else {
++it;
}
}
resize_rollback_sb_if_needed();
if (persist_now) { persist(); }
m_rollback_info_dirty = !persist_now;
}
}

void LogDevMetadata::remove_all_rollback_records(logstore_id_t store_id, bool persist_now) {
uint32_t n_removed{0};
for (auto i = m_rollback_sb->num_records; i > 0; --i) {
auto& rec = m_rollback_sb->at(i - 1);
if (rec.store_id == store_id) {
m_rollback_sb->remove_ith_record(i - 1);
++n_removed;
}
}
if (n_removed) {
m_rollback_info.erase(store_id);
resize_rollback_sb_if_needed();
if (persist_now) { persist(); }
m_rollback_info_dirty = !persist_now;
}
}

uint32_t LogDevMetadata::num_rollback_records(logstore_id_t store_id) const {
HS_DBG_ASSERT_EQ(m_rollback_sb->num_records, m_rollback_info.size(),
"Rollback record count mismatch between sb and in-memory");
return m_rollback_info.count(store_id);
}

bool LogDevMetadata::is_rolled_back(logstore_id_t store_id, logid_t logid) const {
auto it_pair = m_rollback_info.equal_range(store_id);
for (auto it = it_pair.first; it != it_pair.second; ++it) {
const logid_range_t& log_id_range = it->second;
if ((logid >= log_id_range.first) && (logid <= log_id_range.second)) { return true; }
}
return false;
}

bool LogDevMetadata::resize_rollback_sb_if_needed() {
auto req_sz = rollback_superblk::size_needed(m_rollback_info.size());
if (MetaBlkMgrSI()->is_aligned_buf_needed(req_sz)) {
req_sz = sisl::round_up(req_sz, MetaBlkMgrSI()->get_align_size());
}

if (req_sz != m_raw_rollback_buf->size) {
auto new_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_rollback_sb = new (new_buf->bytes) rollback_superblk();
std::memcpy(static_cast< void* >(new_buf->bytes), static_cast< const void* >(m_raw_rollback_buf->bytes),
std::min(m_raw_logdev_buf->size, new_buf->size));
m_raw_rollback_buf = new_buf;
return true;
} else {
return false;
}
}
} // namespace homestore
Loading

0 comments on commit 75c50c7

Please sign in to comment.