Skip to content

Commit

Permalink
Fix raft repl test case for destroying lodev and remove group.
Browse files Browse the repository at this point in the history
Race condition when logdev metablk callbacks. Start delete
of unopened logdev's after all logdev's loaded. In test
wait till the logdev is permanently destroyed.
  • Loading branch information
sanebay committed Jun 14, 2024
1 parent 718d169 commit be6234a
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 19 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.17"
version = "6.4.18"
homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
topics = ("ebay", "nublox")
Expand Down
19 changes: 17 additions & 2 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

#include <homestore/homestore_decl.hpp>
#include <homestore/logstore/log_store.hpp>
#include <homestore/superblk_handler.hpp>

namespace homestore {

Expand All @@ -50,6 +51,17 @@ struct vdev_info;
struct log_dump_req;
struct logdev_superblk;

static constexpr uint64_t logstore_service_sb_magic{0xb0b0c01b};
static constexpr uint32_t logstore_service_sb_version{0x1};

#pragma pack(1)
struct logstore_service_super_block {
uint64_t magic{logstore_service_sb_magic};
uint32_t version{logstore_service_sb_version};
uint32_t m_last_logdev_id{0};
};
#pragma pack()

class LogStoreService {
friend class HomeLogStore;
friend class LogDev;
Expand Down Expand Up @@ -173,24 +185,27 @@ class LogStoreService {
*/
iomgr::io_fiber_t truncate_thread() { return m_truncate_fiber; }

void delete_unopened_logdevs();

private:
std::shared_ptr< LogDev > create_new_logdev_internal(logdev_id_t logdev_id);
void delete_unopened_logdevs();
void on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie);
logdev_id_t get_next_logdev_id();
void logdev_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
void rollback_super_blk_found(const sisl::byte_view& buf, void* meta_cookie);
void start_threads();
void flush_if_needed();

private:
std::unordered_map< logdev_id_t, std::shared_ptr< LogDev > > m_id_logdev_map;
std::unique_ptr< sisl::IDReserver > m_id_reserver;
folly::SharedMutexWritePriority m_logdev_map_mtx;

std::shared_ptr< JournalVirtualDev > m_logdev_vdev;
iomgr::io_fiber_t m_truncate_fiber;
iomgr::io_fiber_t m_flush_fiber;
LogStoreServiceMetrics m_metrics;
std::unordered_set< logdev_id_t > m_unopened_logdev;
superblk< logstore_service_super_block > m_sb;
};

extern LogStoreService& logstore_service();
Expand Down
4 changes: 4 additions & 0 deletions src/lib/device/journal_vdev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ void JournalVirtualDev::destroy(logdev_id_t logdev_id) {
void JournalVirtualDev::Descriptor::append_chunk() {
// Get a new chunk from the pool.
auto new_chunk = m_vdev.m_chunk_pool->dequeue();
#if 0
auto* pdev = new_chunk->physical_dev_mutable();
pdev->sync_write_zero(new_chunk->size(), new_chunk->start_offset());
#endif

// Increase the right window and total size.
m_total_size += new_chunk->size();
Expand Down
38 changes: 29 additions & 9 deletions src/lib/logstore/log_store_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ SISL_LOGGING_DECL(logstore)
LogStoreService& logstore_service() { return hs()->logstore_service(); }

/////////////////////////////////////// LogStoreService Section ///////////////////////////////////////
LogStoreService::LogStoreService() {
m_id_reserver = std::make_unique< sisl::IDReserver >();
LogStoreService::LogStoreService() : m_sb{"LogStoreServiceSB"} {
meta_service().register_handler(
logdev_sb_meta_name,
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) {
Expand All @@ -52,6 +51,17 @@ LogStoreService::LogStoreService() {
rollback_super_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true, std::optional< meta_subtype_vec_t >({logdev_sb_meta_name}));

meta_service().register_handler(
"LogStoreServiceSB",
[this](meta_blk* mblk, sisl::byte_view buf, size_t size) { on_meta_blk_found(std::move(buf), (void*)mblk); },
nullptr);
}

void LogStoreService::on_meta_blk_found(const sisl::byte_view& buf, void* meta_cookie) {
m_sb.load(buf, meta_cookie);
HS_REL_ASSERT_EQ(m_sb->magic, logstore_service_sb_magic, "Invalid log service metablk, magic mismatch");
HS_REL_ASSERT_EQ(m_sb->version, logstore_service_sb_version, "Invalid version of log service metablk");
}

folly::Future< std::error_code > LogStoreService::create_vdev(uint64_t size, HSDevType devType, uint32_t chunk_size) {
Expand Down Expand Up @@ -96,8 +106,10 @@ std::shared_ptr< VirtualDev > LogStoreService::open_vdev(const vdev_info& vinfo,

void LogStoreService::start(bool format) {
// hs()->status_mgr()->register_status_cb("LogStore", bind_this(LogStoreService::get_status, 1));

delete_unopened_logdevs();
if (format) {
m_sb.create(sizeof(logstore_service_super_block));
m_sb.write();
}

// Create an truncate thread loop which handles truncation which does sync IO
start_threads();
Expand All @@ -116,12 +128,17 @@ void LogStoreService::stop() {
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
m_id_logdev_map.clear();
}
m_id_reserver.reset();
}

logdev_id_t LogStoreService::get_next_logdev_id() {
auto id = ++(m_sb->m_last_logdev_id);
m_sb.write();
return id;
}

logdev_id_t LogStoreService::create_new_logdev() {
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
logdev_id_t logdev_id = m_id_reserver->reserve();
logdev_id_t logdev_id = get_next_logdev_id();
auto logdev = create_new_logdev_internal(logdev_id);
logdev->start(true /* format */);
COUNTER_INCREMENT(m_metrics, logdevs_count, 1);
Expand All @@ -132,7 +149,10 @@ logdev_id_t LogStoreService::create_new_logdev() {
void LogStoreService::destroy_log_dev(logdev_id_t logdev_id) {
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
const auto it = m_id_logdev_map.find(logdev_id);
if (it == m_id_logdev_map.end()) { return; }
if (it == m_id_logdev_map.end()) {
LOGERROR("Logdev not found to destroy {}", logdev_id);
return;
}

// Stop the logdev and release all the chunks from the journal vdev.
auto& logdev = it->second;
Expand Down Expand Up @@ -172,7 +192,6 @@ void LogStoreService::open_logdev(logdev_id_t logdev_id) {
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
const auto it = m_id_logdev_map.find(logdev_id);
if (it == m_id_logdev_map.end()) {
m_id_reserver->reserve(logdev_id);
auto logdev = std::make_shared< LogDev >(logdev_id, m_logdev_vdev.get());
m_id_logdev_map.emplace(logdev_id, logdev);
LOGDEBUGMOD(logstore, "log_dev={} does not exist, created!", logdev_id);
Expand Down Expand Up @@ -206,11 +225,11 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m
folly::SharedMutexWritePriority::WriteHolder holder(m_logdev_map_mtx);
std::shared_ptr< LogDev > logdev;
auto id = sb->logdev_id;
HS_LOG(DEBUG, logstore, "Log dev superblk found logdev={}", id);
const auto it = m_id_logdev_map.find(id);
// We could update the logdev map either with logdev or rollback superblks found callbacks.
if (it != m_id_logdev_map.end()) {
logdev = it->second;
HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={}", id);
} else {
logdev = std::make_shared< LogDev >(id, m_logdev_vdev.get());
m_id_logdev_map.emplace(id, logdev);
Expand All @@ -219,6 +238,7 @@ void LogStoreService::logdev_super_blk_found(const sisl::byte_view& buf, void* m
// m_unopened_logdev. so that when we start log service, all the left items in m_unopened_logdev are those
// not open, which can be destroyed
m_unopened_logdev.insert(id);
HS_LOG(DEBUG, logstore, "Log dev superblk found log_dev={} added to unopened list", id);
}

logdev->log_dev_meta().logdev_super_blk_found(buf, meta_cookie);
Expand Down
10 changes: 10 additions & 0 deletions src/lib/logstore/log_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ sisl::byte_view log_stream_reader::next_group(off_t* out_dev_offset) {
return ret_buf;
}

if (header->logdev_id != m_vdev_jd->logdev_id()) {
LOGINFOMOD(logstore, "Entries found for different logdev {} at pos {}, must have come to end of log_dev={}",
header->logdev_id, m_vdev_jd->dev_offset(m_cur_read_bytes), m_vdev_jd->logdev_id());
*out_dev_offset = m_vdev_jd->dev_offset(m_cur_read_bytes);
// move it by dma boundary if header is not valid
m_prev_crc = 0;
m_cur_read_bytes += m_read_size_multiple;
return ret_buf;
}

// Because reuse chunks without cleaning up, we could get chunks used by other logdev's
// and it can happen that log group headers couldnt match. In that case check we dont error
// if its the last chunk or not with is_offset_at_last_chunk else raise assert.
Expand Down
7 changes: 6 additions & 1 deletion src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ namespace homestore {

uint64_t ReplLogStore::append(nuraft::ptr< nuraft::log_entry >& entry) {
// We don't want to transform anything that is not an app log
if (entry->get_val_type() != nuraft::log_val_type::app_log) { return HomeRaftLogStore::append(entry); }
if (entry->get_val_type() != nuraft::log_val_type::app_log) {
ulong lsn = HomeRaftLogStore::append(entry);
RD_LOGD("append entry term={}, log_val_type={} lsn={} size={}", entry->get_term(),
static_cast< uint32_t >(entry->get_val_type()), lsn, entry->get_buf().size());
return lsn;
}

repl_req_ptr_t rreq = m_sm.localize_journal_entry_finish(*entry);
ulong lsn = HomeRaftLogStore::append(entry);
Expand Down
5 changes: 5 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
if (err != ReplServiceError::OK) {
m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::ACTIVE; });
return folly::makeSemiFuture< ReplServiceError >(std::move(err));
LOGERROR("RaftReplDev::destroy_group failed {}", err);
}

return m_destroy_promise.getSemiFuture();
Expand Down Expand Up @@ -896,6 +897,7 @@ nuraft::ptr< nuraft::log_store > RaftReplDev::load_log_store() { return m_data_j
int32_t RaftReplDev::server_id() { return m_raft_server_id; }

bool RaftReplDev::is_destroy_pending() const { return (m_rd_sb->destroy_pending == 0x1); }
bool RaftReplDev::is_destroyed() const { return (*m_stage.access().get() == repl_dev_stage_t::PERMANENT_DESTROYED); }

/////////////////////////////////// nuraft_mesg::mesg_state_mgr overrides ////////////////////////////////////
void RaftReplDev::become_ready() {
Expand All @@ -907,10 +909,12 @@ uint32_t RaftReplDev::get_logstore_id() const { return m_data_journal->logstore_
std::shared_ptr< nuraft::state_machine > RaftReplDev::get_state_machine() { return m_state_machine; }

void RaftReplDev::permanent_destroy() {
RD_LOGI("Permanent destroy for raft repl dev");
m_rd_sb.destroy();
m_raft_config_sb.destroy();
m_data_journal->remove_store();
logstore_service().destroy_log_dev(m_data_journal->logdev_id());
m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::PERMANENT_DESTROYED; });
}

void RaftReplDev::leave() {
Expand All @@ -927,6 +931,7 @@ void RaftReplDev::leave() {
m_rd_sb->destroy_pending = 0x1;
m_rd_sb.write();

RD_LOGI("RaftReplDev leave group");
m_destroy_promise.setValue(ReplServiceError::OK); // In case proposer is waiting for the destroy to complete
}

Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk {

using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;

ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED);
ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED);

class RaftReplDevMetrics : public sisl::MetricsGroup {
public:
Expand Down Expand Up @@ -128,6 +128,7 @@ class RaftReplDev : public ReplDev,
uint32_t get_blk_size() const override;
repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); }
bool is_destroy_pending() const;
bool is_destroyed() const;
Clock::time_point destroyed_time() const { return m_destroyed_time; }

//////////////// Accessor/shortcut methods ///////////////////////
Expand Down
4 changes: 3 additions & 1 deletion src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,10 @@ raft_buf_ptr_t RaftStateMachine::pre_commit_ext(nuraft::state_machine::ext_op_pa

raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params const& params) {
int64_t lsn = s_cast< int64_t >(params.log_idx);

RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn,
m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size());
repl_req_ptr_t rreq = lsn_to_req(lsn);
if (!rreq) { RD_LOGD("Raft channel got null rreq"); }
RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());
if (rreq->is_proposer()) {
// This is the time to ensure flushing of journal happens in the proposer
Expand Down
1 change: 0 additions & 1 deletion src/lib/replication/service/generic_repl_svc.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ class GenericReplService : public ReplicationService {

hs_stats get_cap_stats() const override;
replica_id_t get_my_repl_uuid() const { return m_my_uuid; }

// void resource_audit() override;

protected:
Expand Down
3 changes: 3 additions & 0 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ void RaftReplService::start() {

// Step 8: Start a reaper thread which wakes up time-to-time and fetches pending data or cleans up old requests etc
start_reaper_thread();

// Delete any unopened logstores.
hs()->logstore_service().delete_unopened_logdevs();
}

void RaftReplService::stop() {
Expand Down
1 change: 0 additions & 1 deletion src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ class RaftReplService : public GenericReplService,

private:
void raft_group_config_found(sisl::byte_view const& buf, void* meta_cookie);

void start_reaper_thread();
void stop_reaper_thread();
void fetch_pending_data();
Expand Down
2 changes: 2 additions & 0 deletions src/tests/test_log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ TEST_F(LogDevTest, DeleteUnopenedLogDev) {
LOGINFO("Restart homestore");
restart();

// Explicitly call delete for unopened ones.
hs()->logstore_service().delete_unopened_logdevs();
auto log_devs = logstore_service().get_all_logdevs();
ASSERT_EQ(log_devs.size(), id_set.size() / 2);
for (auto& logdev : log_devs) {
Expand Down
25 changes: 23 additions & 2 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
#include "common/homestore_config.hpp"
#include "common/homestore_assert.hpp"
#include "common/homestore_utils.hpp"

#define private public
#include "test_common/hs_repl_test_common.hpp"
#include "replication/service/raft_repl_service.h"
#include "replication/repl_dev/raft_repl_dev.h"
Expand Down Expand Up @@ -251,10 +253,21 @@ class RaftReplDevTest : public testing::Test {
ASSERT_EQ(err, ReplServiceError::OK) << "Error in destroying the group";
});
}

for (auto const& db : dbs_) {
auto repl_dev = std::dynamic_pointer_cast< RaftReplDev >(db->repl_dev());
do {
std::this_thread::sleep_for(std::chrono::seconds(1));
auto& raft_repl_svc = dynamic_cast< RaftReplService& >(hs()->repl_service());
raft_repl_svc.gc_repl_devs();
LOGINFO("Waiting for repl dev to get destroyed");
} while (!repl_dev->is_destroyed());
}
}

void generate_writes(uint64_t data_size, uint32_t max_size_per_iov, shared< TestReplicatedDB > db = nullptr) {
if (db == nullptr) { db = pick_one_db(); }
LOGINFO("Writing on group_id={}", db->repl_dev()->group_id());
db->db_write(data_size, max_size_per_iov);
}

Expand Down Expand Up @@ -563,9 +576,12 @@ TEST_F(RaftReplDevTest, Leader_Restart) {
// Step 1: Fill up entries on all replicas
uint64_t entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >();
this->write_on_leader(entries_per_attempt, true /* wait for commit on all replicas */);
std::this_thread::sleep_for(std::chrono::seconds(3));

// Step 2: Restart replica-0 (Leader) with a very long delay so that it is lagging behind
this->restart_replica(0, 10 /* shutdown_delay_sec */);
LOGINFO("Restart leader");
this->restart_replica(0, 15 /* shutdown_delay_sec */);
std::this_thread::sleep_for(std::chrono::seconds(3));

// Step 3: While the original leader is down, write entries into the new leader
LOGINFO("After original leader is shutdown, insert more entries into the new leader");
Expand Down Expand Up @@ -647,14 +663,16 @@ TEST_F(RaftReplDevTest, RemoveReplDev) {
this->write_on_leader(entries_per_attempt, false /* wait for commit on all */, dbs_.back());
std::this_thread::sleep_for(std::chrono::milliseconds(2));
this->remove_db(dbs_.back(), true /* wait_for_removal */);
std::this_thread::sleep_for(std::chrono::seconds(2));

// Step 3: Shutdown one of the follower and remove another repl_dev, once the follower is up, it should remove the
// repl_dev and proceed
LOGINFO("Shutdown one of the followers (replica=1) and then remove dbs on other members. Expect replica=1 to "
"remove after it is up");
this->restart_replica(1, 15 /* shutdown_delay_sec */);
LOGINFO("After restart replica 1 {}", dbs_.size());
this->remove_db(dbs_.back(), true /* wait_for_removal */);

LOGINFO("Remove last db {}", dbs_.size());
// TODO: Once generic crash flip/test_infra is available, use flip to crash during removal and restart them to see
// if records are being removed
g_helper->sync_for_cleanup_start();
Expand Down Expand Up @@ -682,9 +700,11 @@ TEST_F(RaftReplDevTest, GCReplReqs) {
}

this->write_on_leader(100 /* num_entries */, true /* wait_for_commit */);
std::this_thread::sleep_for(std::chrono::seconds(2));

// Step 2: Restart replica-0 (Leader)
this->restart_replica(0, 10);
std::this_thread::sleep_for(std::chrono::seconds(2));

LOGINFO("After original leader is shutdown, insert more entries into the new leader");
this->write_on_leader(100, true /* wait for commit on all replicas */);
Expand Down Expand Up @@ -726,6 +746,7 @@ int main(int argc, char* argv[]) {
//
HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) {
s.consensus.leadership_expiry_ms = -1; // -1 means never expires;
s.generic.repl_dev_cleanup_interval_sec = 0;

// only reset when user specified the value for test;
if (SISL_OPTIONS.count("snapshot_distance")) {
Expand Down

0 comments on commit be6234a

Please sign in to comment.