Skip to content

Commit

Permalink
Merge branch 'master' into yk_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxichen authored Aug 30, 2024
2 parents f8d1a4c + 7cd1eab commit ee71ec6
Show file tree
Hide file tree
Showing 21 changed files with 133 additions and 85 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_dependencies.yml
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ jobs:
- name: Build Cache
run: |
pre=$([[ "${{ inputs.build-type }}" != "Debug" ]] && echo "-o sisl:prerelease=${{ inputs.prerelease }}" || echo "")
sudo rm -rf $ANDROID_HOME
conan install \
-c tools.build:skip_test=True \
${pre} \
Expand Down Expand Up @@ -219,7 +220,6 @@ jobs:
run: |
sanitize=$([[ "${{ inputs.tooling }}" == "Sanitize" ]] && echo "True" || echo "False")
pre=$([[ "${{ inputs.build-type }}" != "Debug" ]] && echo "-o sisl:prerelease=${{ inputs.prerelease }}" || echo "")
sudo rm -rf /usr/local/lib/android/
conan create \
${pre} \
-o sisl:malloc_impl=${{ inputs.malloc-impl }} \
Expand Down
1 change: 1 addition & 0 deletions .jenkins/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ RUN set -eux; \
COPY test_index_btree /usr/local/bin/test_index_btree
COPY test_meta_blk_mgr /usr/local/bin/test_meta_blk_mgr
COPY test_log_store /usr/local/bin/test_log_store
COPY test_home_raft_logstore /usr/local/bin/test_home_raft_logstore
COPY test_log_store_long_run /usr/local/bin/test_log_store_long_run
COPY test_data_service /usr/local/bin/test_data_service
COPY test_raft_repl_dev /usr/local/bin/test_raft_repl_dev
Expand Down
1 change: 1 addition & 0 deletions .jenkins/jenkinsfile_nightly
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pipeline {
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_index_btree' -exec cp {} .jenkins/test_index_btree \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_meta_blk_mgr' -exec cp {} .jenkins/test_meta_blk_mgr \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_log_store' -exec cp {} .jenkins/test_log_store \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_home_raft_logstore' -exec cp {} .jenkins/test_home_raft_logstore \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_log_store_long_run' -exec cp {} .jenkins/test_log_store_long_run \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_data_service' -exec cp {} .jenkins/test_data_service \\;"
sh "find ${CONAN_USER_HOME} -type f -wholename '*tests/test_raft_repl_dev' -exec cp {} .jenkins/test_raft_repl_dev \\;"
Expand Down
3 changes: 2 additions & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.4.53"
version = "6.4.55"


homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/logstore/log_store.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
* @brief Rollback the given instance to the given sequence number
*
* @param to_lsn Sequence number back which logs are to be rollbacked
* the to_lsn will be the tail_lsn after rollback.
* @return True on success
*/
bool rollback(logstore_seq_num_t to_lsn);
Expand Down
9 changes: 7 additions & 2 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,15 @@ template < typename T >
using ptr = std::shared_ptr< T >;

class buffer;
class log_entry;
} // namespace nuraft

namespace homestore {
class ReplDev;
class ReplDevListener;
struct repl_req_ctx;
using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;
using repl_req_ptr_t = boost::intrusive_ptr< repl_req_ctx >;

VENUM(repl_req_state_t, uint32_t,
Expand Down Expand Up @@ -56,7 +58,9 @@ struct repl_key {
};

bool operator==(repl_key const& other) const = default;
std::string to_string() const { return fmt::format("server={}, term={}, dsn={}", server_id, term, dsn); }
std::string to_string() const {
return fmt::format("server={}, term={}, dsn={}, hash={}", server_id, term, dsn, Hasher()(*this));
}
};

using repl_snapshot = nuraft::snapshot;
Expand Down Expand Up @@ -149,7 +153,6 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

raft_buf_ptr_t& raft_journal_buf();
uint8_t* raw_journal_buf();

/////////////////////// Non modifiers methods //////////////////
std::string to_string() const;
std::string to_compact_string() const;
Expand Down Expand Up @@ -203,6 +206,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
void set_lsn(int64_t lsn);
void add_state(repl_req_state_t s);
bool add_state_if_not_already(repl_req_state_t s);
void set_lentry(nuraft::ptr< nuraft::log_entry > const& lentry) { m_lentry = lentry; }
void clear();
flatbuffers::FlatBufferBuilder& create_fb_builder() { return m_fb_builder; }
void release_fb_builder() { m_fb_builder.Release(); }
Expand Down Expand Up @@ -234,6 +238,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry
repl_journal_entry* m_journal_entry{nullptr}; // pointer to the journal entry
bool m_is_jentry_localize_pending{false}; // Is the journal entry needs to be localized from remote
nuraft::ptr< nuraft::log_entry > m_lentry;

/////////////// Replication state related section /////////////////
std::atomic< uint32_t > m_state{uint32_cast(repl_req_state_t::INIT)}; // State of the replication request
Expand Down
53 changes: 30 additions & 23 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -435,32 +435,39 @@ bool LogDev::flush() {
return false;
}

LogGroup* lg = prepare_flush(new_idx - m_last_flush_idx + 4); // Estimate 4 more extra in case of parallel writes
if (sisl_unlikely(!lg)) {
THIS_LOGDEV_LOG(TRACE, "Log idx {} last_flush_idx {} prepare flush failed", new_idx, m_last_flush_idx);
return false;
}
auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed);
HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size {}", sz, lg->actual_data_size());
off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size());
lg->m_log_dev_offset = offset;

HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full");
THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset={} log_group={}", lg->actual_data_size(), offset,
*lg);

HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords());
HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size());
// the amount of logs which one logGroup can flush has a upper limit. here we want to make sure all the logs that
// need to be flushed will definitely be flushed to physical dev, so we need this loop to create multiple log groups
// if necessary
for (; m_last_flush_idx < new_idx;) {
LogGroup* lg =
prepare_flush(new_idx - m_last_flush_idx + 4); // Estimate 4 more extra in case of parallel writes
if (sisl_unlikely(!lg)) {
THIS_LOGDEV_LOG(TRACE, "Log idx {} last_flush_idx {} prepare flush failed", new_idx, m_last_flush_idx);
return false;
}
auto sz = m_pending_flush_size.fetch_sub(lg->actual_data_size(), std::memory_order_relaxed);
HS_REL_ASSERT_GE((sz - lg->actual_data_size()), 0, "size {} lg size {}", sz, lg->actual_data_size());
off_t offset = m_vdev_jd->alloc_next_append_blk(lg->header()->total_size());
lg->m_log_dev_offset = offset;

HS_REL_ASSERT_NE(lg->m_log_dev_offset, INVALID_OFFSET, "log dev is full");
THIS_LOGDEV_LOG(TRACE, "Flushing log group data size={} at offset={} log_group={}", lg->actual_data_size(),
offset, *lg);

HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_records_distribution, lg->nrecords());
HISTOGRAM_OBSERVE(logstore_service().m_metrics, logdev_flush_size_distribution, lg->actual_data_size());

// TODO:: add logic to handle this error in upper layer
auto error = m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset);
if (error) {
THIS_LOGDEV_LOG(ERROR, "Fail to sync write to journal vde , error code {} : {}", error.value(),
error.message());
return false;
}

// FIXME:: add logic to handle this error in upper layer
auto error = m_vdev_jd->sync_pwritev(lg->iovecs().data(), int_cast(lg->iovecs().size()), lg->m_log_dev_offset);
if (error) {
THIS_LOGDEV_LOG(ERROR, "Fail to sync write to journal vde , error code {} : {}", error.value(),
error.message());
return false;
on_flush_completion(lg);
}

on_flush_completion(lg);
return true;
}

Expand Down
13 changes: 9 additions & 4 deletions src/lib/logstore/log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ void HomeLogStore::on_log_found(logstore_seq_num_t seq_num, const logdev_key& ld

void HomeLogStore::truncate(logstore_seq_num_t upto_lsn, bool in_memory_truncate_only) {
if (upto_lsn < m_start_lsn) { return; }
flush();
#ifndef NDEBUG
auto cs = get_contiguous_completed_seq_num(0);
if (upto_lsn > cs) {
Expand Down Expand Up @@ -283,13 +284,17 @@ void HomeLogStore::flush(logstore_seq_num_t upto_lsn) {
}

bool HomeLogStore::rollback(logstore_seq_num_t to_lsn) {
// Validate if the lsn to which it is rolledback to is not truncated.
auto ret = m_records.status(to_lsn + 1);
if (ret.is_out_of_range) {
HS_LOG_ASSERT(false, "Attempted to rollback to {} which is already truncated", to_lsn);
//Fast path
if (to_lsn == m_tail_lsn.load()) {
return true;
}

if (to_lsn > m_tail_lsn.load() || to_lsn < m_start_lsn.load()) {
HS_LOG_ASSERT(false, "Attempted to rollback to {} which is not in the range of [{}, {}]", to_lsn, m_start_lsn.load(), m_tail_lsn.load());
return false;
}

THIS_LOGSTORE_LOG(INFO, "Rolling back to {}, tail {}", to_lsn, m_tail_lsn.load());
bool do_flush{false};
do {
{
Expand Down
19 changes: 3 additions & 16 deletions src/lib/replication/log_store/home_raft_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,8 @@ raft_buf_ptr_t HomeRaftLogStore::pack(ulong index, int32_t cnt) {
[this, &out_buf, &remain_cnt]([[maybe_unused]] store_lsn_t cur, const log_buffer& entry) mutable -> bool {
if (remain_cnt-- > 0) {
size_t avail_size = out_buf->size() - out_buf->pos();
if (avail_size < entry.size()) {
avail_size += std::max(out_buf->size() * 2, (size_t)entry.size());
if (avail_size < entry.size() + sizeof(uint32_t)) {
avail_size += std::max(out_buf->size() * 2, (size_t)entry.size() + sizeof(uint32_t));
out_buf = nuraft::buffer::expand(*out_buf, avail_size);
}
REPL_STORE_LOG(TRACE, "packing lsn={} of size={}, avail_size in buffer={}", to_repl_lsn(cur),
Expand Down Expand Up @@ -322,20 +322,7 @@ bool HomeRaftLogStore::compact(ulong compact_lsn) {
append(m_dummy_log_entry);
}
}

m_log_store->flush(to_store_lsn(compact_lsn));

// this will only truncate in memory, and not on disk;
// we rely on resrouce mgr timer to trigger real truncate for all log stores in system;
// this will be friendly for multiple logstore on same logdev;
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("force_home_raft_log_truncate")) {
REPL_STORE_LOG(TRACE, "Flip force_home_raft_log_truncate is enabled, force truncation, compact_lsn={}",
compact_lsn);
m_log_store->truncate(to_store_lsn(compact_lsn));
}
#endif

m_log_store->truncate(to_store_lsn(compact_lsn));
return true;
}

Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/log_store/home_raft_log_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
namespace homestore {

using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;

class HomeRaftLogStore : public nuraft::log_store {
public:
Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ uint8_t* repl_req_ctx::raw_journal_buf() { return std::get< std::unique_ptr< uin

void repl_req_ctx::set_lsn(int64_t lsn) {
DEBUG_ASSERT((m_lsn == -1) || (m_lsn == lsn),
"Changing lsn for request={} on the fly can cause race condition, not expected", to_string());
"Changing lsn for request={} on the fly can cause race condition, not expected. lsn {}, m_lsn {}",
to_string(), lsn, m_lsn);
m_lsn = lsn;
LOGTRACEMOD(replication, "Setting lsn={} for request={}", lsn, to_string());
}
Expand Down
32 changes: 26 additions & 6 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,12 @@ RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk
}

RD_LOG(INFO,
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, compact_lsn={} "
"next_dsn={} "
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, "
"compact_lsn={}, checkpoint_lsn:{}, next_dsn={} "
"log_dev={} log_store={}",
(load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id,
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_next_dsn.load(), m_rd_sb->logdev_id, m_rd_sb->logstore_id);
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_rd_sb->checkpoint_lsn, m_next_dsn.load(),
m_rd_sb->logdev_id, m_rd_sb->logstore_id);

#ifdef _PRERELEASE
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
Expand Down Expand Up @@ -746,7 +747,7 @@ void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse respons
RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
void RaftReplDev::commit_blk(repl_req_ptr_t rreq) {
if (rreq->local_blkid().is_valid()) {
if (data_service().commit_blk(rreq->local_blkid()) != BlkAllocStatus::SUCCESS) {
if (hs()->device_mgr()->is_boot_in_degraded_mode() && m_log_store_replay_done)
Expand All @@ -755,6 +756,10 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
RD_DBG_ASSERT(false, "fail to commit blk when applying log in non-degraded mode.")
}
}
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
commit_blk(rreq);

// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
Expand Down Expand Up @@ -1045,7 +1050,13 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
if (entry->get_val_type() != nuraft::log_val_type::app_log) { continue; }
if (entry->get_buf_ptr()->size() == 0) { continue; }
auto req = m_state_machine->localize_journal_entry_prepare(*entry);
if (req == nullptr) {
// TODO :: we need to indentify whether this log entry should be appended to log store.
// 1 for lsn, if the req#lsn is not -1, it means this log has been localized and apeneded before, we
// should skip it.
// 2 for dsn, if the req#dsn is less than the next_dsn, it means this log has been
// committed, we should skip it.
// here, we only check the first condition for now. revisit here if we need to check the second
if (req == nullptr || req->lsn() != -1) {
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
return {true, nuraft::cb_func::ReturnCode::ReturnNull};
}
Expand Down Expand Up @@ -1181,10 +1192,14 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
}

rreq->set_lsn(repl_lsn);
// keep lentry in scope for the lyfe cycle of the rreq
rreq->set_lentry(lentry);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size);
RD_LOGD("Replay log on restart, rreq=[{}]", rreq->to_string());

if (repl_lsn > m_rd_sb->durable_commit_lsn) {
// In memory state of these blks is lost. Commit them now to avoid usage of same blk twice.
commit_blk(rreq);
m_state_machine->link_lsn_to_req(rreq, int64_cast(repl_lsn));
return;
}
Expand All @@ -1202,7 +1217,12 @@ bool RaftReplDev::is_resync_mode() {
int64_t const leader_commited_lsn = raft_server()->get_leader_committed_log_idx();
int64_t const my_log_idx = raft_server()->get_last_log_idx();
auto diff = leader_commited_lsn - my_log_idx;
return diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold);
bool resync_mode = (diff > HS_DYNAMIC_CONFIG(consensus.resync_log_idx_threshold));
if (resync_mode) {
RD_LOGD("Raft Channel: Resync mode, leader_commited_lsn={}, my_log_idx={}, diff={}", leader_commited_lsn,
my_log_idx, diff);
}
return resync_mode;
}

} // namespace homestore
10 changes: 10 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ struct raft_repl_dev_superblk : public repl_dev_superblk {
#pragma pack()

using raft_buf_ptr_t = nuraft::ptr< nuraft::buffer >;
using raft_cluster_config_ptr_t = nuraft::ptr< nuraft::cluster_config >;

ENUM(repl_dev_stage_t, uint8_t, INIT, ACTIVE, DESTROYING, DESTROYED, PERMANENT_DESTROYED);

Expand Down Expand Up @@ -229,6 +230,14 @@ class RaftReplDev : public ReplDev,
*/
void on_restart();

/**
* \brief This method is called to force leave the group without waiting for committing the destroy message.
* it is used when the repl_dev is a stale member of a destroyed group. this stable member does not receive the
* destroy message. but the group is already destroyed, so no leader will send this message again to this stale
* member. we need to force leave the group to avoid the stale member to be a part of the group.
*/
void force_leave() { leave(); }

protected:
//////////////// All nuraft::state_mgr overrides ///////////////////////
nuraft::ptr< nuraft::cluster_config > load_config() override;
Expand Down Expand Up @@ -258,6 +267,7 @@ class RaftReplDev : public ReplDev,
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
void commit_blk(repl_req_ptr_t rreq);
};

} // namespace homestore
5 changes: 5 additions & 0 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,11 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
return m_success_ptr;
}

void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) {
RD_LOGD("Raft channel: Commit new cluster conf , log_idx = {}", log_idx);
// TODO:add more logic here if necessary
}

void RaftStateMachine::iterate_repl_reqs(std::function< void(int64_t, repl_req_ptr_t rreq) > const& cb) {
for (auto [key, rreq] : m_lsn_req_map) {
cb(key, rreq);
Expand Down
1 change: 1 addition & 0 deletions src/lib/replication/repl_dev/raft_state_machine.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ class RaftStateMachine : public nuraft::state_machine {
uint64_t last_commit_index() override;
raft_buf_ptr_t pre_commit_ext(const nuraft::state_machine::ext_op_params& params) override;
raft_buf_ptr_t commit_ext(const nuraft::state_machine::ext_op_params& params) override;
void commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) override;
void rollback(uint64_t lsn, nuraft::buffer&) override { LOGCRITICAL("Unimplemented rollback on: [{}]", lsn); }
void become_ready();

Expand Down
3 changes: 2 additions & 1 deletion src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,8 @@ void RaftReplService::start_reaper_thread() {
m_reaper_fiber = iomanager.iofiber_self();

// Schedule the rdev garbage collector timer
LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec));
LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds",
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec));
m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer(
HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */,
nullptr, [this](void*) {
Expand Down
Loading

0 comments on commit ee71ec6

Please sign in to comment.