Skip to content

Commit

Permalink
Fix device manager load of chunks.
Browse files Browse the repository at this point in the history
Chunks getting duplciate start offset while recovery. Add or cleanup
logs for debugging. Add test case. Add more logdev for logstore test.
Enable logstore test. Move rollback test to new logdev test.
  • Loading branch information
sanebay committed Feb 22, 2024
1 parent 2f5f94d commit 0ac5ace
Show file tree
Hide file tree
Showing 20 changed files with 504 additions and 254 deletions.
17 changes: 9 additions & 8 deletions .github/workflows/build_dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ on:
jobs:
BuildHomestoreDeps:
runs-on: ${{ inputs.platform }}
timeout-minutes: 24000
steps:
- name: Retrieve Code
uses: actions/checkout@v3
Expand Down Expand Up @@ -206,18 +207,18 @@ jobs:
fail_on_cache_miss: true
if: ${{ inputs.testing == 'True' && github.event_name != 'pull_request' && steps.restore-cache.outputs.cache-hit != 'true' }}

- uses: actions/checkout@v3
- name: Setup tmate session
uses: mxschmitt/action-tmate@v3
with:
limit-access-to-actor: true
detached: true

- name: Create and Test Package
run: |
sanitize=$([[ "${{ inputs.tooling }}" == "Sanitize" ]] && echo "True" || echo "False")
pre=$([[ "${{ inputs.build-type }}" != "Debug" ]] && echo "-o sisl:prerelease=${{ inputs.prerelease }}" || echo "")
conan create \
${pre} \
-o sisl:malloc_impl=${{ inputs.malloc-impl }} \
-o iomgr:testing=off \
-o homestore:sanitize=${sanitize} \
-s build_type=${{ inputs.build-type }} \
--build missing \
.
sleep 10000000
if: ${{ inputs.testing == 'True' && inputs.tooling != 'Coverage' }}

- name: Code Coverage Run
Expand Down
1 change: 1 addition & 0 deletions src/lib/device/device.h
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ class DeviceManager {

std::vector< PhysicalDev* > get_pdevs_by_dev_type(HSDevType dtype) const;
std::vector< shared< VirtualDev > > get_vdevs() const;
std::vector< shared< Chunk > > get_chunks() const;

uint64_t total_capacity() const;
uint64_t total_capacity(HSDevType dtype) const;
Expand Down
10 changes: 10 additions & 0 deletions src/lib/device/device_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,16 @@ std::vector< shared< VirtualDev > > DeviceManager::get_vdevs() const {
return ret_v;
}

std::vector< shared< Chunk > > DeviceManager::get_chunks() const {
std::unique_lock lg{m_vdev_mutex};
std::vector< shared< Chunk > > res;
res.reserve(m_chunks.size());
for (auto& chunk : m_chunks) {
if (chunk) res.push_back(chunk);
}
return res;
}

// Some of the hs_super_blk details
uint64_t hs_super_blk::vdev_super_block_size() { return (hs_super_blk::MAX_VDEVS_IN_SYSTEM * vdev_info::size); }

Expand Down
25 changes: 17 additions & 8 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {

// assert that returnning logical offset is in good range
HS_DBG_ASSERT_LE(tail_off, m_end_offset);
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} desc {}", to_hex(tail_off), to_string());
LOGDEBUGMOD(journalvdev, "returned tail_off 0x{} size {} desc {}", to_hex(tail_off), sz, to_string());
return tail_off;
}

Expand Down Expand Up @@ -443,6 +443,14 @@ off_t JournalVirtualDev::Descriptor::dev_offset(off_t nbytes) const {
return vdev_offset;
}

void JournalVirtualDev::Descriptor::update_data_start_offset(off_t offset) {
m_data_start_offset = offset;
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
LOGTRACEMOD(journalvdev, "Updated data start offset off 0x{} {}", to_hex(offset), to_string());
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch {}", to_string());
}

off_t JournalVirtualDev::Descriptor::tail_offset(bool reserve_space_include) const {
off_t tail = static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed));
if (reserve_space_include) { tail += m_reserved_sz; }
Expand All @@ -461,7 +469,7 @@ void JournalVirtualDev::Descriptor::update_tail_offset(off_t tail) {
}
lseek(tail);

LOGDEBUGMOD(journalvdev, "tail arg 0x{} desc {} ", to_hex(tail), to_string());
LOGDEBUGMOD(journalvdev, "Updated tail offset arg 0x{} desc {} ", to_hex(tail), to_string());
HS_REL_ASSERT(tail_offset() == tail, "tail offset mismatch after calculation 0x{} : {}", tail_offset(), tail);
}

Expand Down Expand Up @@ -598,7 +606,7 @@ bool JournalVirtualDev::Descriptor::is_alloc_accross_chunk(size_t size) const {

nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json j;
j["logdev_id"] = m_logdev_id;
j["logdev"] = m_logdev_id;
j["seek_cursor"] = m_seek_cursor;
j["data_start_offset"] = m_data_start_offset;
j["end_offset"] = m_end_offset;
Expand All @@ -613,7 +621,7 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
nlohmann::json c;
auto* private_data = r_cast< JournalChunkPrivate* >(const_cast< uint8_t* >(chunk->user_private()));
c["chunk_id"] = chunk->chunk_id();
c["logdev_id"] = private_data->logdev_id;
c["logdev"] = private_data->logdev_id;
c["is_head"] = private_data->is_head;
c["end_of_chunk"] = private_data->end_of_chunk;
c["next_chunk"] = private_data->next_chunk;
Expand All @@ -627,12 +635,13 @@ nlohmann::json JournalVirtualDev::Descriptor::get_status(int log_level) const {
}

std::string JournalVirtualDev::Descriptor::to_string() const {
std::string str{fmt::format("id={};ds=0x{};end=0x{};writesz={};tail=0x{};"
off_t tail =
static_cast< off_t >(data_start_offset() + m_write_sz_in_total.load(std::memory_order_relaxed)) + m_reserved_sz;
std::string str{fmt::format("log_dev={};ds=0x{};end=0x{};writesz={};tail=0x{};"
"rsvdsz={};chunks={};trunc={};total={};seek=0x{} ",
m_logdev_id, to_hex(m_data_start_offset), to_hex(m_end_offset),
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail_offset()),
m_reserved_sz, m_journal_chunks.size(), m_truncate_done, m_total_size,
to_hex(m_seek_cursor))};
m_write_sz_in_total.load(std::memory_order_relaxed), to_hex(tail), m_reserved_sz,
m_journal_chunks.size(), m_truncate_done, m_total_size, to_hex(m_seek_cursor))};
return str;
}

Expand Down
9 changes: 3 additions & 6 deletions src/lib/device/journal_vdev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -233,12 +233,7 @@ class JournalVirtualDev : public VirtualDev {
*
* @param offset : the start logical offset to be persisted
*/
void update_data_start_offset(off_t offset) {
m_data_start_offset = offset;
auto data_start_offset_aligned = sisl::round_down(m_data_start_offset, m_vdev.info().chunk_size);
m_end_offset = data_start_offset_aligned + m_journal_chunks.size() * m_vdev.info().chunk_size;
RELEASE_ASSERT_EQ(m_end_offset - data_start_offset_aligned, m_total_size, "offset size mismatch");
}
void update_data_start_offset(off_t offset);

/**
* @brief : get the logical tail offset;
Expand Down Expand Up @@ -309,6 +304,8 @@ class JournalVirtualDev : public VirtualDev {
*/
nlohmann::json get_status(int log_level) const;

logdev_id_t logdev_id() const { return m_logdev_id; }

std::string to_string() const;

private:
Expand Down
8 changes: 7 additions & 1 deletion src/lib/device/physical_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -247,12 +247,13 @@ std::vector< shared< Chunk > > PhysicalDev::create_chunks(const std::vector< uin
auto chunk = std::make_shared< Chunk >(this, *cinfo, cslot);
ret_chunks.push_back(chunk);
get_stream(chunk).m_chunks_map.insert(std::pair{chunk_ids[cit], std::move(chunk)});

HS_LOG(DEBUG, device, "Creating chunk {}", chunk->to_string());
cinfo->~chunk_info();
}

m_chunk_info_slots->set_bits(b.start_bit, b.nbits);
write_super_block(buf, chunk_info::size * b.nbits, chunk_info_offset_nth(b.start_bit));

hs_utils::iobuf_free(buf, sisl::buftag::superblk);

chunks_remaining -= b.nbits;
Expand Down Expand Up @@ -296,6 +297,7 @@ shared< Chunk > PhysicalDev::create_chunk(uint32_t chunk_id, uint32_t vdev_id, u

auto bitmap_mem = m_chunk_info_slots->serialize(m_pdev_info.dev_attr.align_size);
write_super_block(bitmap_mem->cbytes(), bitmap_mem->size(), hs_super_blk::chunk_sb_offset());
HS_LOG(TRACE, device, "Created chunk {}", chunk->to_string());

cinfo->~chunk_info();
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
Expand Down Expand Up @@ -397,6 +399,7 @@ void PhysicalDev::do_remove_chunk(cshared< Chunk >& chunk) {
get_stream(chunk).m_chunks_map.erase(chunk->chunk_id());
cinfo->~chunk_info();
hs_utils::iobuf_free(buf, sisl::buftag::superblk);
HS_LOG(TRACE, device, "Removed chunk {}", chunk->to_string());
}

uint64_t PhysicalDev::chunk_info_offset_nth(uint32_t slot) const {
Expand All @@ -418,11 +421,14 @@ void PhysicalDev::populate_chunk_info(chunk_info* cinfo, uint32_t vdev_id, uint6
cinfo->set_allocated();
cinfo->set_user_private(private_data);
cinfo->compute_checksum();
auto [_, inserted] = m_chunk_start.insert(cinfo->chunk_start_offset);
RELEASE_ASSERT(inserted, "Duplicate start offset {} for chunk {}", cinfo->chunk_start_offset, cinfo->chunk_id);
}

void PhysicalDev::free_chunk_info(chunk_info* cinfo) {
auto ival = ChunkInterval::right_open(cinfo->chunk_start_offset, cinfo->chunk_start_offset + cinfo->chunk_size);
m_chunk_data_area.erase(ival);
m_chunk_start.erase(cinfo->chunk_start_offset);

cinfo->set_free();
cinfo->checksum = 0;
Expand Down
1 change: 1 addition & 0 deletions src/lib/device/physical_dev.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ class PhysicalDev {
ChunkIntervalSet m_chunk_data_area; // Range of chunks data area created
std::unique_ptr< sisl::Bitset > m_chunk_info_slots; // Slots to write the chunk info
uint32_t m_chunk_sb_size{0}; // Total size of the chunk sb at present
std::unordered_set< uint64_t > m_chunk_start; // Store and verify start offset of all chunks for debugging.

public:
PhysicalDev(const dev_info& dinfo, int oflags, const pdev_info_header& pinfo);
Expand Down
68 changes: 40 additions & 28 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,7 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) {
m_last_flush_idx = m_log_idx - 1;
}

m_flush_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */,
iomgr::reactor_regex::all_worker,
[this](void*) {
if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); }
},
true /* wait_to_schedule */);

start_timer();
handle_unopened_log_stores(format);

{
Expand Down Expand Up @@ -132,8 +125,7 @@ void LogDev::stop() {
m_block_flush_q_cv.wait(lk, [&] { return m_stopped; });
}

// cancel the timer
iomanager.cancel_timer(m_flush_timer_hdl, true);
stop_timer();

{
folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx);
Expand All @@ -159,6 +151,21 @@ void LogDev::stop() {
m_hs.reset();
}

void LogDev::start_timer() {
m_flush_timer_hdl = iomanager.schedule_global_timer(
HS_DYNAMIC_CONFIG(logstore.flush_timer_frequency_us) * 1000, true, nullptr /* cookie */,
iomgr::reactor_regex::all_worker,
[this](void*) {
if (m_pending_flush_size.load() && !m_is_flushing.load(std::memory_order_relaxed)) { flush_if_needed(); }
},
true /* wait_to_schedule */);
}

void LogDev::stop_timer() {
// cancel the timer
iomanager.cancel_timer(m_flush_timer_hdl, true);
}

void LogDev::do_load(const off_t device_cursor) {
log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple};
logid_t loaded_from{-1};
Expand Down Expand Up @@ -258,11 +265,17 @@ log_buffer LogDev::read(const logdev_key& key, serialized_log_record& return_rec
m_vdev_jd->sync_pread(buf->bytes(), initial_read_size, key.dev_offset);

auto* header = r_cast< const log_group_header* >(buf->cbytes());
HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch!");
HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch!");
HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx");
HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx, "log key offset does not match with log_idx");
HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group");
THIS_LOGDEV_LOG(TRACE, "Logdev read log group header {}", *header);
HS_REL_ASSERT_EQ(header->magic_word(), LOG_GROUP_HDR_MAGIC, "Log header corrupted with magic mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_EQ(header->get_version(), log_group_header::header_version, "Log header version mismatch! {} {}",
m_logdev_id, *header);
HS_REL_ASSERT_LE(header->start_idx(), key.idx, "log key offset does not match with log_idx {} }{}", m_logdev_id,
*header);
HS_REL_ASSERT_GT((header->start_idx() + header->nrecords()), key.idx,
"log key offset does not match with log_idx {} {}", m_logdev_id, *header);
HS_LOG_ASSERT_GE(header->total_size(), header->_inline_data_offset(), "Inconsistent size data in log group {} {}",
m_logdev_id, *header);

// We can only do crc match in read if we have read all the blocks. We don't want to aggressively read more data
// than we need to just to compare CRC for read operation. It can be done during recovery.
Expand Down Expand Up @@ -337,7 +350,7 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) {
}
});

lg->finish(get_prev_crc());
lg->finish(m_logdev_id, get_prev_crc());
if (sisl_unlikely(flushing_upto_idx == -1)) { return nullptr; }
lg->m_flush_log_idx_from = m_last_flush_idx + 1;
lg->m_flush_log_idx_upto = flushing_upto_idx;
Expand All @@ -346,7 +359,6 @@ LogGroup* LogDev::prepare_flush(const int32_t estimated_records) {
HS_DBG_ASSERT_GT(lg->header()->oob_data_offset, 0);

THIS_LOGDEV_LOG(DEBUG, "Flushing upto log_idx={}", flushing_upto_idx);
THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg);
return lg;
}

Expand Down Expand Up @@ -408,7 +420,7 @@ bool LogDev::flush_if_needed(int64_t threshold_size) {
lg->m_log_dev_offset = offset;
HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full");
THIS_LOGDEV_LOG(TRACE, "Flush prepared, flushing data size={} at offset={}", lg->actual_data_size(), offset);

THIS_LOGDEV_LOG(DEBUG, "Log Group: {}", *lg);
do_flush(lg);
return true;
} else {
Expand Down Expand Up @@ -540,14 +552,14 @@ void LogDev::unlock_flush(bool do_flush) {
uint64_t LogDev::truncate(const logdev_key& key) {
HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx);
uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx);
LOGINFO("LogDev::truncate {}", num_records_to_truncate);
THIS_LOGDEV_LOG(INFO, "LogDev::truncate {}", num_records_to_truncate);
if (num_records_to_truncate > 0) {
HS_PERIODIC_LOG(INFO, logstore,
"Truncating log device upto logdev {} log_id={} vdev_offset={} truncated {} log records",
m_logdev_id, key.idx, key.dev_offset, num_records_to_truncate);
m_log_records->truncate(key.idx);
m_vdev_jd->truncate(key.dev_offset);
LOGINFO("LogDev::truncate {}", key.idx);
THIS_LOGDEV_LOG(INFO, "LogDev::truncate {}", key.idx);
m_last_truncate_idx = key.idx;

{
Expand All @@ -574,11 +586,11 @@ uint64_t LogDev::truncate(const logdev_key& key) {

// We can remove the rollback records of those upto which logid is getting truncated
m_logdev_meta.remove_rollback_record_upto(key.idx, false /* persist_now */);
LOGINFO("LogDev::truncate remove rollback {}", key.idx);
THIS_LOGDEV_LOG(INFO, "LogDev::truncate remove rollback {}", key.idx);
m_logdev_meta.persist();
#ifdef _PRERELEASE
if (garbage_collect && iomgr_flip::instance()->test_flip("logdev_abort_after_garbage")) {
LOGINFO("logdev aborting after unreserving garbage ids");
THIS_LOGDEV_LOG(INFO, "logdev aborting after unreserving garbage ids");
raise(SIGKILL);
}
#endif
Expand Down Expand Up @@ -633,7 +645,7 @@ std::shared_ptr< HomeLogStore > LogDev::create_new_log_store(bool append_mode) {
HS_REL_ASSERT((it == m_id_logstore_map.end()), "store_id {}-{} already exists", m_logdev_id, store_id);
m_id_logstore_map.insert(std::pair(store_id, logstore_info{.log_store = lstore, .append_mode = append_mode}));
}
LOGINFO("Created log store id {}-{}", m_logdev_id, store_id);
LOGINFO("Created log store log_dev={} log_store={}", m_logdev_id, store_id);
return lstore;
}

Expand All @@ -653,7 +665,7 @@ folly::Future< shared< HomeLogStore > > LogDev::open_log_store(logstore_id_t sto
}

void LogDev::remove_log_store(logstore_id_t store_id) {
LOGINFO("Removing log store id {}-{}", m_logdev_id, store_id);
LOGINFO("Removing log_dev={} log_store={}", m_logdev_id, store_id);
{
folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx);
auto ret = m_id_logstore_map.erase(store_id);
Expand Down Expand Up @@ -695,8 +707,8 @@ void LogDev::on_log_store_found(logstore_id_t store_id, const logstore_superblk&
return;
}

LOGINFO("Found a logstore store_id={}-{} with start seq_num={}, Creating a new HomeLogStore instance", m_logdev_id,
store_id, sb.m_first_seq_num);
LOGINFO("Found a logstore log_dev={} log_store={} with start seq_num={}, Creating a new HomeLogStore instance",
m_logdev_id, store_id, sb.m_first_seq_num);
logstore_info& info = it->second;
info.log_store =
std::make_shared< HomeLogStore >(shared_from_this(), store_id, info.append_mode, sb.m_first_seq_num);
Expand Down Expand Up @@ -1055,7 +1067,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n
uint32_t n_removed{0};
for (auto i = m_rollback_sb->num_records; i > 0; --i) {
auto& rec = m_rollback_sb->at(i - 1);
LOGINFO("Removing record sb {} {}", rec.idx_range.second, upto_id);
HS_LOG(TRACE, logstore, "Removing record sb {} {}", rec.idx_range.second, upto_id);
if (rec.idx_range.second <= upto_id) {
m_rollback_sb->remove_ith_record(i - 1);
++n_removed;
Expand All @@ -1064,7 +1076,7 @@ void LogDevMetadata::remove_rollback_record_upto(logid_t upto_id, bool persist_n

if (n_removed) {
for (auto it = m_rollback_info.begin(); it != m_rollback_info.end();) {
LOGINFO("Removing info {} {}", it->second.second, upto_id);
HS_LOG(TRACE, logstore, "Removing info {} {}", it->second.second, upto_id);
if (it->second.second <= upto_id) {
it = m_rollback_info.erase(it);
} else {
Expand Down
Loading

0 comments on commit 0ac5ace

Please sign in to comment.