Skip to content

Commit

Permalink
Fix log stream reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
sanebay committed Feb 1, 2024
1 parent 4da2de4 commit fb5853e
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 28 deletions.
4 changes: 2 additions & 2 deletions src/lib/device/virtual_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,8 @@ void VirtualDev::add_chunk(cshared< Chunk >& chunk, bool is_fresh_chunk) {

void VirtualDev::remove_chunk(cshared< Chunk >& chunk) {
std::unique_lock lg{m_mgmt_mutex};
auto iter = std::remove_if(m_all_chunks.begin(), m_all_chunks.end(), [chunk](auto c) { return c == chunk; });
m_all_chunks.erase(iter, m_all_chunks.end());
m_all_chunks[chunk->chunk_id()].reset();
m_all_chunks[chunk->chunk_id()] = nullptr;
m_chunk_selector->remove_chunk(chunk);
}

Expand Down
26 changes: 18 additions & 8 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ void LogDev::start(bool format, JournalVirtualDev* vdev) {
m_store_found_cb(spair.first, spair.second);
}

THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ",
m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx());
LOGINFO("get start vdev offset during recovery {} {} log indx {} ", m_family_id,
m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx());

m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset());
m_log_idx = m_logdev_meta.get_start_log_idx();
Expand Down Expand Up @@ -151,26 +151,34 @@ void LogDev::stop() {
void LogDev::do_load(const off_t device_cursor) {
log_stream_reader lstream{device_cursor, m_vdev, m_vdev_jd, m_flush_size_multiple};
logid_t loaded_from{-1};

LOGINFO("LogDev::do_load start {} ", m_family_id);
off_t group_dev_offset;
do {
const auto buf = lstream.next_group(&group_dev_offset);
if (buf.size() == 0) {
LOGINFO("{} LogDev loaded log_idx in range of[{} - {}] ", m_family_id, loaded_from, m_log_idx - 1);
assert_next_pages(lstream);
THIS_LOGDEV_LOG(INFO, "LogDev loaded log_idx in range of [{} - {}]", loaded_from, m_log_idx - 1);
break;
}

auto* header = r_cast< const log_group_header* >(buf.bytes());
if (loaded_from == -1 && header->start_idx() < m_log_idx) {
LOGINFO("{} LogDev loaded log_idx in range of[{} - {}] ", m_family_id, loaded_from, m_log_idx - 1);

// log dev is truncated completely
assert_next_pages(lstream);
THIS_LOGDEV_LOG(INFO, "LogDev loaded log_idx in range of [{} - {}]", loaded_from, m_log_idx - 1);
break;
}

LOGINFO("{} group_dev_offset {} loaded_from {} m_log_idx {} header {} ", m_family_id, group_dev_offset,
loaded_from, m_log_idx.load(), *header);
HS_REL_ASSERT_EQ(header->start_idx(), m_log_idx.load(), "log indx is not the expected one");
if (loaded_from == -1) { loaded_from = header->start_idx(); }
if (loaded_from == -1) {
loaded_from = header->start_idx();
LOGINFO("m_family_id {} {} ", m_family_id, header->start_idx());
}

// Loop through each record within the log group and do a callback
decltype(header->nrecords()) i{0};
Expand Down Expand Up @@ -201,13 +209,15 @@ void LogDev::do_load(const off_t device_cursor) {
}
++i;
}
LOGINFO(" {} group_dev_offset {} m_log_idx {} i {} ", m_family_id, group_dev_offset, m_log_idx.load(), i);
m_log_idx = header->start_idx() + i;
m_last_crc = header->cur_grp_crc;
} while (true);

// Update the tail offset with where we finally end up loading, so that new append entries can be written from
// here.
m_vdev_jd->update_tail_offset(group_dev_offset);
LOGINFO("LogDev::do_load end {} ", m_family_id);
}

void LogDev::assert_next_pages(log_stream_reader& lstream) {
Expand All @@ -220,8 +230,8 @@ void LogDev::assert_next_pages(log_stream_reader& lstream) {
auto* header = r_cast< const log_group_header* >(buf.bytes());
HS_REL_ASSERT_GT(m_log_idx.load(std::memory_order_acquire), header->start_idx(),
"Found a header with future log_idx after reaching end of log. Hence rbuf which was read "
"must have been corrupted, Header: {}",
*header);
"must have been corrupted, Family {} Header: {}",
m_family_id, *header);
}
}
}
Expand Down Expand Up @@ -528,8 +538,8 @@ uint64_t LogDev::truncate(const logdev_key& key) {
HS_DBG_ASSERT_GE(key.idx, m_last_truncate_idx);
uint64_t const num_records_to_truncate = static_cast< uint64_t >(key.idx - m_last_truncate_idx);
if (num_records_to_truncate > 0) {
HS_PERIODIC_LOG(INFO, logstore, "Truncating log device upto log_id={} vdev_offset={} truncated {} log records",
key.idx, key.dev_offset, num_records_to_truncate);
LOGINFO("Truncating log device upto {} log_id={} vdev_offset={} truncated {} log records", m_family_id, key.idx,
key.dev_offset, num_records_to_truncate);
m_log_records->truncate(key.idx);
m_vdev_jd->truncate(key.dev_offset);
m_last_truncate_idx = key.idx;
Expand Down
18 changes: 7 additions & 11 deletions src/lib/logstore/log_store_family.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ void LogStoreFamily::device_truncate(const std::shared_ptr< truncate_req >& treq
}
if (done) {
if (treq->cb) { treq->cb(treq->m_trunc_upto_result); }
if (treq->wait_till_done) {
std::lock_guard< std::mutex > lk{treq->mtx};
treq->cv.notify_one();
}
if (treq->wait_till_done) { treq->cv.notify_one(); }
}
m_log_dev.unlock_flush();
});
Expand Down Expand Up @@ -235,6 +232,7 @@ logdev_key LogStoreFamily::do_device_truncate(bool dry_run) {
m_non_participating_stores.clear();
logdev_key min_safe_ld_key = logdev_key::out_of_bound_ld_key();

LOGINFO("do_device_truncate on family {}", m_family_id);
std::string dbg_str{"Format [store_id:trunc_lsn:logidx:dev_trunc_pending?:active_writes_in_trucate?] "};

{
Expand Down Expand Up @@ -266,18 +264,16 @@ logdev_key LogStoreFamily::do_device_truncate(bool dry_run) {
}

if ((min_safe_ld_key == logdev_key::out_of_bound_ld_key()) || (min_safe_ld_key.idx < 0)) {
HS_PERIODIC_LOG(
INFO, logstore,
"[Family={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>",
m_family_id, dbg_str);
LOGINFO("[Family={}] No log store append on any log stores, skipping device truncation, all_logstore_info:<{}>",
m_family_id, dbg_str);
return min_safe_ld_key;
}

LOGINFO("do_device_truncate on family {}", m_family_id);
// Got the safest log id to truncate and actually truncate upto the safe log idx to the log device
if (!dry_run) { m_log_dev.truncate(min_safe_ld_key); }
HS_PERIODIC_LOG(INFO, logstore,
"[Family={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}",
m_family_id, dbg_str, min_safe_ld_key);
LOGINFO("[Family={}] LogDevice truncate, all_logstore_info:<{}> safe log dev key to truncate={}", m_family_id,
dbg_str, min_safe_ld_key);

// We call post device truncation only to the log stores whose prepared truncation points are fully
// truncated or to stores which didn't particpate in this device truncation.
Expand Down
4 changes: 0 additions & 4 deletions src/lib/logstore/log_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,6 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) {
if (m_cur_log_buf.size() < min_needed) {
do {
m_cur_log_buf = read_next_bytes(std::max(min_needed, bulk_read_size));
if (m_cur_log_buf.size() == 0) {
LOGINFOMOD(logstore, "Logdev data empty");
return {};
}
} while (m_cur_log_buf.size() < sizeof(log_group_header));
min_needed = 0;
}
Expand Down
6 changes: 3 additions & 3 deletions src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ if (${io_tests})
add_test(NAME MetaBlkMgr-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_meta_blk_mgr)
add_test(NAME DataService-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_data_service)

#add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
# add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore)
# add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev)
add_test(NAME SoloReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_solo_repl_dev)
add_test(NAME HomeRaftLogStore-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_home_raft_logstore)
add_test(NAME RaftReplDev-Epoll COMMAND ${CMAKE_BINARY_DIR}/bin/test_raft_repl_dev)
endif()

can_build_spdk_io_tests(spdk_tests)
Expand Down
5 changes: 5 additions & 0 deletions src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,12 @@ SISL_OPTION_GROUP(test_solo_repl_dev,
int main(int argc, char* argv[]) {
int parsed_argc{argc};
::testing::InitGoogleTest(&parsed_argc, argv);

SISL_OPTIONS_LOAD(parsed_argc, argv, logging, test_solo_repl_dev, iomgr, test_common_setup);
// sisl::logging::SetModuleLogLevel("logstore", spdlog::level::level_enum::trace);
// sisl::logging::SetModuleLogLevel("device", spdlog::level::level_enum::trace);
// sisl::logging::SetModuleLogLevel("journalvdev", spdlog::level::level_enum::trace);

sisl::logging::SetLogger("test_solo_repl_dev");
spdlog::set_pattern("[%D %T%z] [%^%l%$] [%n] [%t] %v");

Expand Down

0 comments on commit fb5853e

Please sign in to comment.