Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Begin back-porting LogDev::rollback() from 4.x #613

Open
wants to merge 3 commits into
base: stable/v3.7.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def build_requirements(self):

def requirements(self):
self.requires("iomgr/[~=8]")
self.requires("sisl/[~=8]")
self.requires("sisl/[~=8.9]")

# FOSS, rarely updated
self.requires("boost/1.79.0")
Expand Down
212 changes: 177 additions & 35 deletions src/homelogstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ SISL_LOGGING_DECL(logstore)
#define THIS_LOGDEV_PERIODIC_LOG(level, msg, ...) \
HS_PERIODIC_DETAILED_LOG(level, logstore, "logdev", m_family_id, , , msg, __VA_ARGS__)

LogDev::LogDev(const logstore_family_id_t f_id, const std::string& metablk_name) :
m_family_id{f_id}, m_logdev_meta{metablk_name} {
LogDev::LogDev(const logstore_family_id_t f_id, const std::string& logdev_name) :
m_family_id{f_id}, m_logdev_meta{logdev_name} {
m_flush_size_multiple = 0;
if (f_id == HomeLogStoreMgr::DATA_LOG_FAMILY_IDX) {
m_flush_size_multiple = HS_DYNAMIC_CONFIG(logstore->flush_size_multiple_data_logdev);
Expand All @@ -47,10 +47,6 @@ LogDev::LogDev(const logstore_family_id_t f_id, const std::string& metablk_name)

LogDev::~LogDev() = default;

void LogDev::meta_blk_found(meta_blk* const mblk, const sisl::byte_view buf, const size_t size) {
m_logdev_meta.meta_buf_found(buf, static_cast< void* >(mblk));
}

uint32_t LogDev::get_align_size() const { return m_blkstore->get_align_size(); }

void LogDev::start(const bool format, JournalVirtualDev* blk_store) {
Expand Down Expand Up @@ -182,10 +178,17 @@ void LogDev::do_load(const off_t device_cursor) {
b.set_size(rec->size);
if (m_last_truncate_idx == -1) { m_last_truncate_idx = header->start_idx() + i; }
if (m_logfound_cb) {
THIS_LOGDEV_LOG(TRACE, "seq num {}, log indx {}, group dev offset {} size {}", rec->store_seq_num,
(header->start_idx() + i), group_dev_offset, rec->size);
m_logfound_cb(rec->store_id, rec->store_seq_num, {header->start_idx() + i, group_dev_offset},
flush_ld_key, b, (header->nrecords() - (i + 1)));
// Validate if the id is present in rollback info
if (m_logdev_meta.is_rolled_back(rec->store_id, header->start_idx() + i)) {
THIS_LOGDEV_LOG(
DEBUG, "logstore_id[{}] log_idx={}, lsn={} has been rolledback, not notifying the logstore",
rec->store_id, (header->start_idx() + i), rec->store_seq_num);
} else {
THIS_LOGDEV_LOG(TRACE, "seq num {}, log indx {}, group dev offset {} size {}", rec->store_seq_num,
(header->start_idx() + i), group_dev_offset, rec->size);
m_logfound_cb(rec->store_id, rec->store_seq_num, {header->start_idx() + i, group_dev_offset},
flush_ld_key, b, (header->nrecords() - (i + 1)));
}
}
++i;
}
Expand Down Expand Up @@ -585,6 +588,9 @@ uint64_t LogDev::truncate(const logdev_key& key) {
#endif
}

// We can remove the rollback records of those upto which logid is getting truncated
m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */);

m_logdev_meta.persist();
#ifdef _PRERELEASE
if (garbage_collect && homestore_flip->test_flip("logdev_abort_after_garbage")) {
Expand All @@ -602,6 +608,11 @@ void LogDev::update_store_superblk(const logstore_id_t idx, const logstore_super
m_logdev_meta.update_store_superblk(idx, meta, persist_now);
}

void LogDev::rollback(logstore_id_t store_id, logid_range_t id_range) {
std::unique_lock lg{m_meta_mutex};
m_logdev_meta.add_rollback_record(store_id, id_range, true);
}

sisl::status_response LogDev::get_status(const sisl::status_request& request) const {
sisl::status_response response;
response.json["current_log_idx"] = m_log_idx.load(std::memory_order_relaxed);
Expand All @@ -618,14 +629,33 @@ sisl::status_response LogDev::get_status(const sisl::status_request& request) co
}

/////////////////////////////// LogDevMetadata Section ///////////////////////////////////////
LogDevMetadata::LogDevMetadata(const std::string& metablk_name) : m_metablk_name{metablk_name} {}
LogDevMetadata::LogDevMetadata(const std::string& logdev_name) : m_name{logdev_name} {
MetaBlkMgrSI()->register_handler(
m_name,
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) {
logdev_super_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr);

MetaBlkMgrSI()->register_handler(
m_name + "_rollback_sb",
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) {
rollback_super_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr);
}

logdev_superblk* LogDevMetadata::create() {
const auto req_sz{required_sb_size(0)};
auto req_sz{logdev_sb_size_needed(0)};
// TO DO: Might need to address alignment based on data or fast type
m_raw_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz), sisl::buftag::metablk,
MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_buf->bytes) logdev_superblk();
m_raw_logdev_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_logdev_buf->bytes) logdev_superblk();

req_sz = rollback_superblk::size_needed(1);
m_raw_rollback_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_rollback_sb = new (m_raw_rollback_buf->bytes) rollback_superblk();

logstore_superblk* const sb_area{m_sb->get_logstore_superblk()};
std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value());
Expand All @@ -636,22 +666,32 @@ logdev_superblk* LogDevMetadata::create() {
}

void LogDevMetadata::reset() {
m_raw_buf.reset();
m_raw_logdev_buf.reset();
m_sb = nullptr;
m_meta_mgr_cookie = nullptr;
m_logdev_cookie = nullptr;
m_id_reserver.reset();
m_store_info.clear();
}

void LogDevMetadata::meta_buf_found(const sisl::byte_view& buf, void* const meta_cookie) {
m_meta_mgr_cookie = meta_cookie;
m_raw_buf = hs_utils::extract_byte_array(buf, true, MetaBlkMgrSI()->get_align_size());
m_sb = reinterpret_cast< logdev_superblk* >(m_raw_buf->bytes);
void LogDevMetadata::logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) {
m_logdev_cookie = meta_cookie;
m_raw_logdev_buf = hs_utils::extract_byte_array(buf, true, MetaBlkMgrSI()->get_align_size());
m_sb = reinterpret_cast< logdev_superblk* >(m_raw_logdev_buf->bytes);

HS_REL_ASSERT_EQ(m_sb->get_magic(), logdev_superblk::LOGDEV_SB_MAGIC, "Invalid logdev metablk, magic mismatch");
HS_REL_ASSERT_EQ(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Invalid version of logdev metablk");
}

void LogDevMetadata::rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie) {
m_rollback_cookie = meta_cookie;
m_raw_rollback_buf = hs_utils::extract_byte_array(buf, true, MetaBlkMgrSI()->get_align_size());
m_rollback_sb = reinterpret_cast< rollback_superblk* >(m_raw_rollback_buf->bytes);

HS_REL_ASSERT_EQ(m_rollback_sb->get_magic(), rollback_superblk::ROLLBACK_SB_MAGIC, "Rollback sb magic mismatch");
HS_REL_ASSERT_EQ(m_rollback_sb->get_version(), rollback_superblk::ROLLBACK_SB_VERSION,
"Rollback sb version mismatch");
}

std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::load() {
std::vector< std::pair< logstore_id_t, logstore_superblk > > ret_list;
ret_list.reserve(1024);
Expand All @@ -662,7 +702,7 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
m_id_reserver = std::make_unique< sisl::IDReserver >();
}

HS_REL_ASSERT_NE(m_raw_buf->bytes, nullptr, "Load called without getting metadata");
HS_REL_ASSERT_NE(m_raw_logdev_buf->bytes, nullptr, "Load called without getting metadata");
HS_REL_ASSERT_LE(m_sb->get_version(), logdev_superblk::LOGDEV_SB_VERSION, "Logdev super blk version mismatch");

const logstore_superblk* const store_sb{m_sb->get_logstore_superblk()};
Expand All @@ -678,6 +718,11 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
++idx;
}

for (uint32_t i{0}; i < m_rollback_sb->num_records; ++i) {
const auto& rec = m_rollback_sb->at(i);
m_rollback_info.insert({rec.store_id, rec.idx_range});
}

return ret_list;
}

Expand All @@ -689,11 +734,23 @@ void LogDevMetadata::persist() {
}
#endif

if (m_meta_mgr_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_buf->bytes), m_raw_buf->size, m_meta_mgr_cookie);
if (m_logdev_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_logdev_buf->bytes), m_raw_logdev_buf->size,
m_logdev_cookie);
} else {
MetaBlkMgrSI()->add_sub_sb(m_metablk_name, static_cast< const void* >(m_raw_buf->bytes), m_raw_buf->size,
m_meta_mgr_cookie);
MetaBlkMgrSI()->add_sub_sb(m_name, static_cast< const void* >(m_raw_logdev_buf->bytes), m_raw_logdev_buf->size,
m_logdev_cookie);
}

if (m_rollback_info_dirty) {
if (m_rollback_cookie) {
MetaBlkMgrSI()->update_sub_sb(static_cast< const void* >(m_raw_rollback_buf->bytes),
m_raw_rollback_buf->size, m_rollback_cookie);
} else {
MetaBlkMgrSI()->add_sub_sb(m_name + "_rollback_sb", static_cast< const void* >(m_raw_rollback_buf->bytes),
m_raw_rollback_buf->size, m_rollback_cookie);
}
m_rollback_info_dirty = false;
}
}

Expand All @@ -715,6 +772,7 @@ logstore_id_t LogDevMetadata::reserve_store(const bool persist_now) {
void LogDevMetadata::unreserve_store(const logstore_id_t idx, const bool persist_now) {
m_id_reserver->unreserve(idx);
m_store_info.erase(idx);
remove_all_rollback_records(idx, persist_now);

resize_if_needed();
if (idx < *m_store_info.rbegin()) {
Expand Down Expand Up @@ -761,29 +819,113 @@ void LogDevMetadata::set_start_dev_offset(const off_t offset, const logid_t key_
logid_t LogDevMetadata::get_start_log_idx() const { return m_sb->key_idx; }

bool LogDevMetadata::resize_if_needed() {
auto req_sz{required_sb_size((m_store_info.size() == 0) ? 0u : *m_store_info.rbegin() + 1)};
auto req_sz{logdev_sb_size_needed((m_store_info.size() == 0) ? 0u : *m_store_info.rbegin() + 1)};
if (MetaBlkMgrSI()->is_aligned_buf_needed(req_sz)) {
req_sz = sisl::round_up(req_sz, MetaBlkMgrSI()->get_align_size());
}
if (req_sz != m_raw_buf->size) {
const auto old_buf{m_raw_buf};
if (req_sz != m_raw_logdev_buf->size) {
const auto old_buf{m_raw_logdev_buf};
// TO DO: Might need to address alignment based on data or fast type
m_raw_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_buf->bytes) logdev_superblk();
m_raw_logdev_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_sb = new (m_raw_logdev_buf->bytes) logdev_superblk();

logstore_superblk* const sb_area{m_sb->get_logstore_superblk()};
std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value());

std::memcpy(static_cast< void* >(m_raw_buf->bytes), static_cast< const void* >(old_buf->bytes),
std::min(old_buf->size, m_raw_buf->size));
std::memcpy(static_cast< void* >(m_raw_logdev_buf->bytes), static_cast< const void* >(old_buf->bytes),
std::min(old_buf->size, m_raw_logdev_buf->size));
return true;
} else {
return false;
}
}

uint32_t LogDevMetadata::store_capacity() const {
return (m_raw_buf->size - sizeof(logdev_superblk)) / sizeof(logstore_superblk);
return (m_raw_logdev_buf->size - sizeof(logdev_superblk)) / sizeof(logstore_superblk);
}

void LogDevMetadata::add_rollback_record(logstore_id_t store_id, logid_range_t id_range, bool persist_now) {
m_rollback_info.insert({store_id, id_range});
resize_rollback_sb_if_needed();
m_rollback_sb->add_record(store_id, id_range);

if (persist_now) { persist(); }
m_rollback_info_dirty = !persist_now;
}

void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_now) {
uint32_t n_removed{0};
for (auto i = m_rollback_sb->num_records; i > 0; --i) {
auto& rec = m_rollback_sb->at(i - 1);
if (rec.idx_range.second <= upto_id) {
m_rollback_sb->remove_ith_record(i - 1);
++n_removed;
}
}

if (n_removed) {
for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) {
if (it->second.second <= upto_id) {
it = m_rollback_info.erase(it);
} else {
++it;
}
}
resize_rollback_sb_if_needed();
if (persist_now) { persist(); }
m_rollback_info_dirty = !persist_now;
}
}

void LogDevMetadata::remove_all_rollback_records(logstore_id_t store_id, bool persist_now) {
uint32_t n_removed{0};
for (auto i = m_rollback_sb->num_records; i > 0; --i) {
auto& rec = m_rollback_sb->at(i - 1);
if (rec.store_id == store_id) {
m_rollback_sb->remove_ith_record(i - 1);
++n_removed;
}
}
if (n_removed) {
m_rollback_info.erase(store_id);
resize_rollback_sb_if_needed();
if (persist_now) { persist(); }
m_rollback_info_dirty = !persist_now;
}
}

uint32_t LogDevMetadata::num_rollback_records(logstore_id_t store_id) const {
HS_DBG_ASSERT_EQ(m_rollback_sb->num_records, m_rollback_info.size(),
"Rollback record count mismatch between sb and in-memory");
return m_rollback_info.count(store_id);
}

bool LogDevMetadata::is_rolled_back(logstore_id_t store_id, logid_t logid) const {
auto it_pair = m_rollback_info.equal_range(store_id);
for (auto it = it_pair.first; it != it_pair.second; ++it) {
const logid_range_t& log_id_range = it->second;
if ((logid >= log_id_range.first) && (logid <= log_id_range.second)) { return true; }
}
return false;
}

bool LogDevMetadata::resize_rollback_sb_if_needed() {
auto req_sz = rollback_superblk::size_needed(m_rollback_info.size());
if (MetaBlkMgrSI()->is_aligned_buf_needed(req_sz)) {
req_sz = sisl::round_up(req_sz, MetaBlkMgrSI()->get_align_size());
}

if (req_sz != m_raw_rollback_buf->size) {
auto new_buf = hs_utils::make_byte_array(req_sz, MetaBlkMgrSI()->is_aligned_buf_needed(req_sz),
sisl::buftag::metablk, MetaBlkMgrSI()->get_align_size());
m_rollback_sb = new (new_buf->bytes) rollback_superblk();
std::memcpy(static_cast< void* >(new_buf->bytes), static_cast< const void* >(m_raw_rollback_buf->bytes),
std::min(m_raw_logdev_buf->size, new_buf->size));
m_raw_rollback_buf = new_buf;
return true;
} else {
return false;
}
}
} // namespace homestore
Loading
Loading