Skip to content

Commit

Permalink
Duplication Handling (#611)
Browse files Browse the repository at this point in the history
* Duplication Handling in Blob Write

This commit addresses duplication issues on the follower side caused by resync from the leader, it mainly happens when resend snapshot mesg during baseline resync and apply log after snapshot completion. This helps avoid unnecessary GC due to duplicated data.

Key Changes:
- Utilize allocation hints to check data existence via the application listener.
- Introduce `committed_blk_id` in `blk_alloc_hints` to indicate already allocated and committed blocks and pass it from application to HS, preventing reallocation and recommitment.
- In `alloc_local_blks()`, if `committed_blk_id` is returned, also add states `DATA_RECEIVED`, `DATA_WRITTEN`, and `DATA_COMMITTED` to skip async_write() and commit_blk().

On the leader side (`RaftReplDev::async_alloc_write`), duplication is treated as an error, as the leader should not propose duplicate data, which may result from mistakes.

* Add UT and bump up to 6.6.0

* Move alloc blk logic into rreq.init

This commit addresses the issue encountered during a restart.
In the previous commit, the DATA_COMMITTED state was used to skip the commit_blk operation. However, after restart, repl_req state DATA_COMMITTED is lost. In this case, if the lsn of log entry is greater than durable_commit_lsn, the data will be committed directly without the opportunity to find if the data is duplicated, as a result, commit_blk may fail due to duplication.
  • Loading branch information
yuwmao authored Dec 23, 2024
1 parent f69e78e commit 9f9bd45
Show file tree
Hide file tree
Showing 9 changed files with 199 additions and 49 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.29"
version = "6.6.0"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/blk.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ struct blk_alloc_hints {
blk_temp_t desired_temp{0}; // Temperature hint for the device
std::optional< uint32_t > pdev_id_hint; // which physical device to pick (hint if any) -1 for don't care
std::optional< chunk_num_t > chunk_id_hint; // any specific chunk id to pick for this allocation
std::optional<MultiBlkId> committed_blk_id; // blk id indicates the blk was already allocated and committed, don't allocate and commit again
std::optional< stream_id_t > stream_id_hint; // any specific stream to pick
std::optional< uint64_t > application_hint; // hints in uint64 what will be passed opaque to select_chunk
bool can_look_for_other_chunk{true}; // If alloc on device not available can I pick other device
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ VENUM(ReplServiceError, int32_t,
NOT_IMPLEMENTED = -10001,
NO_SPACE_LEFT = -20000,
DRIVE_WRITE_ERROR = -20001,
DATA_DUPLICATED = -20002,
FAILED = -32768);
// clang-format on

Expand Down
7 changes: 4 additions & 3 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ VENUM(repl_req_state_t, uint32_t,
DATA_WRITTEN = 1 << 2, // Data has been written to the storage
LOG_RECEIVED = 1 << 3, // Log is received and waiting for data
LOG_FLUSHED = 1 << 4, // Log has been flushed
ERRORED = 1 << 5 // Error has happened and cleaned up
ERRORED = 1 << 5, // Error has happened and cleaned up
DATA_COMMITTED = 1 << 6 // Data has already been committed, used in duplication handling, will skip commit_blk
)

VENUM(journal_type_t, uint16_t,
Expand Down Expand Up @@ -142,8 +143,8 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
public:
repl_req_ctx() { m_start_time = Clock::now(); }
virtual ~repl_req_ctx();
void init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size);
ReplServiceError init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size, cshared< ReplDevListener >& listener);

/////////////////////// All getters ///////////////////////
repl_key const& rkey() const { return m_rkey; }
Expand Down
30 changes: 28 additions & 2 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@
#include <common/homestore_config.hpp>
#include "replication/repl_dev/common.h"
#include <libnuraft/nuraft.hxx>
#include <iomgr/iomgr_flip.hpp>

namespace homestore {

void repl_req_ctx::init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size) {
ReplServiceError repl_req_ctx::init(repl_key rkey, journal_type_t op_code, bool is_proposer, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size, cshared< ReplDevListener >& listener) {
m_rkey = std::move(rkey);
#ifndef NDEBUG
if (data_size > 0) {
Expand All @@ -24,6 +25,18 @@ void repl_req_ctx::init(repl_key rkey, journal_type_t op_code, bool is_proposer,
m_header = user_header;
m_key = key;
m_is_jentry_localize_pending = (!is_proposer && (data_size > 0)); // Pending on the applier and with linked data

// We need to allocate the block if the req has data linked, since entry doesn't exist or if it exist, two threads(data channel and raft channel) are trying to do the same
// thing. So take state mutex and allocate the blk
std::unique_lock< std::mutex > lg(m_state_mtx);
if (has_linked_data() && !has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto alloc_status = alloc_local_blks(listener, data_size);
if (alloc_status != ReplServiceError::OK) {
LOGERROR("Allocate blk for rreq failed error={}", alloc_status);
}
return alloc_status;
}
return ReplServiceError::OK;
}

repl_req_ctx::~repl_req_ctx() {
Expand Down Expand Up @@ -91,6 +104,19 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list
auto const hints_result = listener->get_blk_alloc_hints(m_header, data_size);
if (hints_result.hasError()) { return hints_result.error(); }

if (hints_result.value().committed_blk_id.has_value()) {
//if the committed_blk_id is already present, use it and skip allocation and commitment
LOGINFO("For Repl_key=[{}] data already exists, skip", rkey().to_string());
m_local_blkid = hints_result.value().committed_blk_id.value();
add_state(repl_req_state_t::BLK_ALLOCATED);
add_state(repl_req_state_t::DATA_RECEIVED);
add_state(repl_req_state_t::DATA_WRITTEN);
add_state(repl_req_state_t::DATA_COMMITTED);
m_data_received_promise.setValue();
m_data_written_promise.setValue();
return ReplServiceError::OK;
}

auto status = data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()),
hints_result.value(), m_local_blkid);
if (status != BlkAllocStatus::SUCCESS) {
Expand Down
63 changes: 34 additions & 29 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ AsyncReplResult<> RaftReplDev::replace_member(const replica_member_info& member_
sisl::blob header(r_cast< uint8_t* >(&members), sizeof(replace_members_ctx));
rreq->init(
repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0);
journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0, m_listener);

auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
Expand Down Expand Up @@ -251,7 +251,7 @@ folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
// here, we set the dsn to a new one , which is definitely unique in the follower, so that the new rreq will not
// have a conflict with the old rreq.
rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0);
journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0, m_listener);

auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
Expand Down Expand Up @@ -292,25 +292,28 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const&
}
}

rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
auto status = rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true /* is_proposer */,
header, key, data.size);
header, key, data.size, m_listener);

// Add the request to the repl_dev_rreq map, it will be accessed throughout the life cycle of this request
auto const [it, happened] = m_repl_key_req_map.emplace(rreq->rkey(), rreq);
RD_DBG_ASSERT(happened, "Duplicate repl_key={} found in the map", rreq->rkey().to_string());

if (status != ReplServiceError::OK) {
RD_LOGD("Initializing rreq failed error={}, failing this req", status);
handle_error(rreq, status);
return;
}

// If it is header only entry, directly propose to the raft
if (rreq->has_linked_data()) {
push_data_to_all_followers(rreq, data);

// Step 1: Alloc Blkid
auto const status = rreq->alloc_local_blks(m_listener, data.size);
if (status != ReplServiceError::OK) {
RD_LOGD("Allocating blks failed error={}, failing this req", status);
handle_error(rreq, status);
if (rreq->is_proposer() && rreq->has_state(repl_req_state_t::DATA_COMMITTED)) {
RD_LOGD("data blks has already been allocated and committed, failing this req");
handle_error(rreq, ReplServiceError::DATA_DUPLICATED);
return;
}
push_data_to_all_followers(rreq, data);

COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
COUNTER_INCREMENT(m_metrics, outstanding_data_write_cnt, 1);
Expand Down Expand Up @@ -498,32 +501,24 @@ repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_typ
}
}

// We need to allocate the block, since entry doesn't exist or if it exist, two threads are trying to do the same
// thing. So take state mutex and allocate the blk
std::unique_lock< std::mutex > lg(rreq->m_state_mtx);
rreq->init(rkey, code, false /* is_proposer */, user_header, key, data_size);

// There is no data portion, so there is not need to allocate
// rreq->init will allocate the block if it has linked data.
auto status = rreq->init(rkey, code, false /* is_proposer */, user_header, key, data_size, m_listener);
if (!rreq->has_linked_data()) { return rreq; }
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { return rreq; }

auto alloc_status = rreq->alloc_local_blks(m_listener, data_size);
#ifdef _PRERELEASE
if (is_data_channel) {
if (iomgr_flip::instance()->test_flip("fake_reject_append_data_channel")) {
LOGINFO("Data Channel: Reject append_entries flip is triggered for rkey={}", rkey.to_string());
alloc_status = ReplServiceError::NO_SPACE_LEFT;
status = ReplServiceError::NO_SPACE_LEFT;
}
} else {
if (iomgr_flip::instance()->test_flip("fake_reject_append_raft_channel")) {
LOGINFO("Raft Channel: Reject append_entries flip is triggered for rkey={}", rkey.to_string());
alloc_status = ReplServiceError::NO_SPACE_LEFT;
status = ReplServiceError::NO_SPACE_LEFT;
}
}
#endif

if (alloc_status != ReplServiceError::OK) {
RD_LOGE("For Repl_key=[{}] alloc hints returned error={}, failing this req", rkey.to_string(), alloc_status);
if (status != ReplServiceError::OK) {
RD_LOGD("For Repl_key=[{}] alloc hints returned error={}, failing this req", rkey.to_string(), status);
// Do not call handle_error here, because handle_error is for rreq which needs to be terminated. This one can be
// retried.
return nullptr;
Expand Down Expand Up @@ -930,8 +925,8 @@ void RaftReplDev::handle_rollback(repl_req_ptr_t rreq) {
}
}

void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
commit_blk(rreq);
void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
if (!rreq->has_state(repl_req_state_t::DATA_COMMITTED)) { commit_blk(rreq); }

// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
Expand Down Expand Up @@ -979,7 +974,12 @@ void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err)
HS_REL_ASSERT(false, "Unexpected: LSN={} is already ready to commit, exist_rreq=[{}]",
rreq->lsn(), exist_rreq->to_string());
}

if (err == ReplServiceError::DATA_DUPLICATED) {
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err);
m_listener->on_error(err, rreq->header(), rreq->key(), rreq);
rreq->clear();
return;
}
if (rreq->op_code() == journal_type_t::HS_DATA_LINKED) {
// Free the blks which is allocated already
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
Expand Down Expand Up @@ -1512,7 +1512,12 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
rreq->set_lsn(repl_lsn);
// keep lentry in scope for the lyfe cycle of the rreq
rreq->set_lentry(lentry);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size);
auto status = rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry),
data_size, m_listener);
if (status != ReplServiceError::OK) {
RD_LOGE("Initializing rreq failed, rreq=[{}], error={}", rreq->to_string(), status);
}

// we load the log from log device, implies log flushed. We only flush log after data is written to data device.
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->add_state(repl_req_state_t::LOG_RECEIVED);
Expand Down
18 changes: 6 additions & 12 deletions src/lib/replication/repl_dev/solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,24 +30,18 @@ SoloReplDev::SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existi
void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
repl_req_ptr_t rreq) {
if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); }
rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1},
value.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true, header, key,
value.size);

auto status = rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1},
value.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true,
header, key, value.size, m_listener);
HS_REL_ASSERT_EQ(status, ReplServiceError::OK, "Error in allocating local blks");
// If it is header only entry, directly write to the journal
if (rreq->has_linked_data()) {
// Step 1: Alloc Blkid
auto const status = rreq->alloc_local_blks(m_listener, value.size);
HS_REL_ASSERT_EQ(status, ReplServiceError::OK, "Error in allocating local blks");

if (rreq->has_linked_data() && !rreq->has_state(repl_req_state_t::DATA_WRITTEN)) {
// Write the data
data_service().async_write(value, rreq->local_blkid()).thenValue([this, rreq = std::move(rreq)](auto&& err) {
HS_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener
write_journal(std::move(rreq));
});
} else {
write_journal(std::move(rreq));
}
} else { write_journal(std::move(rreq)); }
}

void SoloReplDev::write_journal(repl_req_ptr_t rreq) {
Expand Down
78 changes: 76 additions & 2 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ class TestReplicatedDB : public homestore::ReplDevListener {
struct journal_header {
uint64_t data_size;
uint64_t data_pattern;
uint64_t key_id; //put it in header to test duplication in alloc_local_blks
};

journal_header jheader;
uint64_t key_id;
sisl::sg_list write_sgs;
Expand All @@ -108,6 +108,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
write_sgs.size = 0;
read_sgs.size = 0;
key_id = (uint64_t)rand() << 32 | rand();
jheader.key_id = key_id;
}

~test_req() {
Expand Down Expand Up @@ -171,6 +172,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
cintrusive< repl_req_ctx >& ctx) override {
LOGINFOMOD(replication, "[Replica={}] Received error={} on key={}", g_helper->replica_num(), enum_name(error),
*(r_cast< uint64_t const* >(key.cbytes())));
g_helper->runner().comp_promise_.setException(folly::make_exception_wrapper<ReplServiceError>(error));
}

AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override {
Expand Down Expand Up @@ -316,7 +318,16 @@ class TestReplicatedDB : public homestore::ReplDevListener {

void free_user_snp_ctx(void*& user_snp_ctx) override {}

ReplResult< blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override {
ReplResult<blk_alloc_hints> get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override {
auto jheader = r_cast<test_req::journal_header const*>(header.cbytes());
Key k{.id_ = jheader->key_id};
auto iter = inmem_db_.find(k);
if (iter != inmem_db_.end()) {
LOGDEBUG("data already exists in mem db, key={}", k.id_);
auto hints = blk_alloc_hints{};
hints.committed_blk_id = iter->second.blkid_;
return hints;
}
return blk_alloc_hints{};
}
void on_replace_member(const replica_member_info& member_out, const replica_member_info& member_in) override {
Expand All @@ -335,6 +346,7 @@ class TestReplicatedDB : public homestore::ReplDevListener {
auto req = intrusive< test_req >(new test_req());
req->jheader.data_size = data_size;
req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num;
req->jheader.key_id = req->key_id;
auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >();

LOGINFOMOD(replication, "[Replica={}] Db write key={} data_size={} pattern={} block_size={}",
Expand Down Expand Up @@ -591,6 +603,68 @@ class RaftReplDevTestBase : public testing::Test {
written_entries_ += num_entries;
if (wait_for_commit) { this->wait_for_all_commits(); }
}
replica_id_t wait_and_get_leader_id() {
do {
auto leader_uuid = dbs_[0]->repl_dev()->get_leader_id();
if (leader_uuid.is_nil()) {
LOGINFO("Waiting for leader to be elected");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
} else {
return leader_uuid;
}
} while (true);
}

ReplServiceError write_with_id(uint64_t id, bool wait_for_commit = true, shared< TestReplicatedDB > db = nullptr) {
if (dbs_[0]->repl_dev() == nullptr) return ReplServiceError::FAILED;
if (db == nullptr) { db = pick_one_db(); }
LOGINFO("Writing data {} since I am the leader my_uuid={}", id,
boost::uuids::to_string(g_helper->my_replica_id()));
auto const block_size = SISL_OPTIONS["block_size"].as< uint32_t >();

LOGINFO("Run on worker threads to schedule append on repldev for {} Bytes.", block_size);
g_helper->runner().set_num_tasks(1);
g_helper->runner().set_task([this, block_size, db, id]() {
static std::normal_distribution<> num_blks_gen{3.0, 1.0};
auto data_size = std::max(1L, std::abs(std::lround(num_blks_gen(g_re)))) * block_size;
ASSERT_GT(data_size, 0);
LOGINFO("data_size larger than 0, go ahead, data_size= {}.", data_size);
static std::atomic<uint32_t> s_uniq_num{0};
auto req = intrusive(new TestReplicatedDB::test_req());
req->jheader.data_size = data_size;
req->jheader.data_pattern = ((long long)rand() << 32) | ++s_uniq_num;
//overwrite the key_id with the id passed in
req->jheader.key_id = id;
req->key_id = id;

LOGINFOMOD(replication, "[Replica={}] Db write key={} data_size={} pattern={} block_size={}",
g_helper->replica_num(), req->key_id, data_size, req->jheader.data_pattern, block_size);

if (data_size != 0) {
req->write_sgs =
test_common::HSTestHelper::create_sgs(data_size, block_size, req->jheader.data_pattern);
}

db->repl_dev()->async_alloc_write(req->header_blob(), req->key_blob(), req->write_sgs, req);
});

if (!wait_for_commit) {
return ReplServiceError::OK;
}
try {
g_helper->runner().execute().get();
LOGDEBUG("write data task complete, id={}", id)
} catch (const ReplServiceError& e) {
LOGERRORMOD(replication, "[Replica={}] Error in writing data: id={}, error={}", g_helper->replica_num(),
id, enum_name(e));
return e;
}

written_entries_ += 1;
LOGINFO("wait_for_commit={}", written_entries_);
this->wait_for_all_commits();
return ReplServiceError::OK;
}

void remove_db(std::shared_ptr< TestReplicatedDB > db, bool wait_for_removal) {
this->run_on_leader(db, [this, db]() {
Expand Down
Loading

0 comments on commit 9f9bd45

Please sign in to comment.