Skip to content

Commit

Permalink
Merge branch 'master' into cp_fix
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxichen authored Dec 23, 2024
2 parents fbb8ad6 + f69e78e commit c96c726
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 44 deletions.
4 changes: 2 additions & 2 deletions src/include/homestore/btree/detail/btree_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,8 @@ VENUM(btree_node_type, uint32_t, FIXED = 0, VAR_VALUE = 1, VAR_KEY = 2, VAR_OBJE
VENUM(btree_store_type, uint8_t, MEM = 0, SSD = 1)
#endif
ENUM(btree_status_t, uint32_t, success, not_found, retry, has_more, node_read_failed, put_failed, space_not_avail,
cp_mismatch, merge_not_required, merge_failed, crc_mismatch, not_supported, node_freed)
ENUM(btree_status_t, uint32_t, success, not_found, retry, has_more, node_read_failed, already_exists, filtered_out,
space_not_avail, cp_mismatch, merge_not_required, merge_failed, crc_mismatch, not_supported, node_freed)
/*ENUM(btree_node_write_type, uint8_t,
new_node, // Node write whenever a new node is created.
Expand Down
8 changes: 3 additions & 5 deletions src/include/homestore/btree/detail/btree_mutate_impl.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ retry:
const auto matched = my_node->match_range(req.working_range(), start_idx, end_idx);
if (!matched) {
BT_NODE_LOG_ASSERT(false, my_node, "match_range returns 0 entries for interior node is not valid pattern");
ret = btree_status_t::put_failed;
ret = btree_status_t::not_found;
goto out;
}
} else if constexpr (std::is_same_v< ReqT, BtreeSinglePutRequest >) {
Expand Down Expand Up @@ -182,10 +182,8 @@ btree_status_t Btree< K, V >::mutate_write_leaf_node(const BtreeNodePtr& my_node
req.shift_working_range();
}
} else if constexpr (std::is_same_v< ReqT, BtreeSinglePutRequest >) {
if (!to_variant_node(my_node)->put(req.key(), req.value(), req.m_put_type, req.m_existing_val,
req.m_filter_cb)) {
ret = btree_status_t::put_failed;
}
ret = to_variant_node(my_node)->put(req.key(), req.value(), req.m_put_type, req.m_existing_val,
req.m_filter_cb);
COUNTER_INCREMENT(m_metrics, btree_obj_count, 1);
}

Expand Down
28 changes: 16 additions & 12 deletions src/include/homestore/btree/detail/variant_node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,14 +192,14 @@ class VariantNode : public StoreSpecificBtreeNode {
/// translates into one of "Insert", "Update" or "Upsert".
/// @param existing_val [optional] A pointer to a value to store the value of the existing entry if it was updated.
/// @param filter_cb [optional] A callback function to be called for each entry found in the node that has a key. It
/// is used as an filter to remove anything that needn't be updated.
/// @return A boolean indicating whether the operation was successful.
/// is used as a filter to remove anything that needn't be updated.
/// @return A status code indicating whether the operation was successful.
///
virtual bool put(BtreeKey const& key, BtreeValue const& val, btree_put_type put_type, BtreeValue* existing_val,
put_filter_cb_t const& filter_cb = nullptr) {
virtual btree_status_t put(BtreeKey const &key, BtreeValue const &val, btree_put_type put_type,
BtreeValue *existing_val, put_filter_cb_t const &filter_cb = nullptr) {
LOGMSG_ASSERT_EQ(magic(), BTREE_NODE_MAGIC, "Magic mismatch on btree_node {}",
get_persistent_header_const()->to_string());
bool ret = true;
auto ret = btree_status_t::success;

DEBUG_ASSERT_EQ(
this->is_leaf(), true,
Expand All @@ -210,22 +210,26 @@ class VariantNode : public StoreSpecificBtreeNode {
if (existing_val) { get_nth_value(idx, existing_val, true); }
if (filter_cb &&
filter_cb(get_nth_key< K >(idx, false), get_nth_value(idx, false), val) !=
put_filter_decision::replace) {
return false;
put_filter_decision::replace) {
LOGINFO("Filter callback rejected the update for key {}", key.to_string());
return btree_status_t::filtered_out;
}
}

if (put_type == btree_put_type::INSERT) {
if (found) {
LOGDEBUG("Attempt to insert duplicate entry {}", key.to_string());
return false;
LOGINFO("Attempt to insert duplicate entry {}", key.to_string());
return btree_status_t::already_exists;
}
ret = (insert(idx, key, val) == btree_status_t::success);
ret = insert(idx, key, val);
} else if (put_type == btree_put_type::UPDATE) {
if (!found) return false;
if (!found) {
LOGINFO("Attempt to update non-existent entry {}", key.to_string());
return btree_status_t::not_found;
}
update(idx, key, val);
} else if (put_type == btree_put_type::UPSERT) {
(found) ? update(idx, key, val) : (void)insert(idx, key, val);
found ? update(idx, key, val) : (void) insert(idx, key, val);
} else {
DEBUG_ASSERT(false, "Wrong put_type {}", put_type);
}
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
int64_t lsn() const { return m_lsn; }
bool is_proposer() const { return m_is_proposer; }
journal_type_t op_code() const { return m_op_code; }
bool is_volatile() const { return m_is_volatile.load(); }

sisl::blob const& header() const { return m_header; }
sisl::blob const& key() const { return m_key; }
Expand Down Expand Up @@ -222,6 +223,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; }
void set_local_blkid(MultiBlkId const& lbid) { m_local_blkid = lbid; } // Only used during recovery
void set_is_volatile(bool is_volatile) { m_is_volatile.store(is_volatile); }
void set_lsn(int64_t lsn);
void add_state(repl_req_state_t s);
bool add_state_if_not_already(repl_req_state_t s);
Expand All @@ -248,6 +250,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
bool m_is_proposer{false}; // Is the repl_req proposed by this node
Clock::time_point m_start_time; // Start time of the request
journal_type_t m_op_code{journal_type_t::HS_DATA_INLINED}; // Operation code for this request
std::atomic< bool > m_is_volatile{true}; // Is the log still in memory and not flushed to disk yet

/////////////// Data related section /////////////////
MultiBlkId m_local_blkid; // Local BlkId for the data
Expand Down
13 changes: 13 additions & 0 deletions src/lib/replication/log_store/repl_log_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,19 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
}
}

// Convert volatile logs to non-volatile logs in state machine
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
auto rreq = m_sm.lsn_to_req(lsn);
if (rreq != nullptr) {
if (rreq->has_state(repl_req_state_t::ERRORED)) {
RD_LOGE("Raft Channel: rreq=[{}] met some errors before", rreq->to_compact_string());
continue;
}
rreq->set_is_volatile(false);
}
}

sisl::VectorPool< repl_req_ptr_t >::free(reqs);
sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs);
}
Expand Down
44 changes: 32 additions & 12 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,8 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector<
});
}

bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms) {
bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms,
std::vector< repl_req_ptr_t >* timeout_rreqs) {
std::vector< folly::Future< folly::Unit > > futs;
std::vector< repl_req_ptr_t > only_wait_reqs;
only_wait_reqs.reserve(rreqs.size());
Expand All @@ -621,14 +622,23 @@ bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rre

// We are yet to support reactive fetch from remote.
if (is_resync_mode()) {
check_and_fetch_remote_data(std::move(only_wait_reqs));
check_and_fetch_remote_data(only_wait_reqs);
} else {
m_repl_svc.add_to_fetch_queue(shared_from_this(), std::move(only_wait_reqs));
m_repl_svc.add_to_fetch_queue(shared_from_this(), only_wait_reqs);
}

// block waiting here until all the futs are ready (data channel filled in and promises are made);
auto all_futs = folly::collectAllUnsafe(futs).wait(std::chrono::milliseconds(timeout_ms));
return (all_futs.isReady());
auto all_futs_ready = folly::collectAllUnsafe(futs).wait(std::chrono::milliseconds(timeout_ms)).isReady();
if (!all_futs_ready && timeout_rreqs != nullptr) {
timeout_rreqs->clear();
for (size_t i{0}; i < futs.size(); ++i) {
if (!futs[i].isReady()) {
timeout_rreqs->emplace_back(only_wait_reqs[i]);
}
}
all_futs_ready = timeout_rreqs->empty();
}
return all_futs_ready;
}

void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreqs) {
Expand Down Expand Up @@ -953,18 +963,25 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {

void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) {
if (err == ReplServiceError::OK) { return; }
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err);

if (!rreq->add_state_if_not_already(repl_req_state_t::ERRORED)) {
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err);
RD_LOGE("Raft Channel: Error has been added for rreq=[{}] error={}", rreq->to_string(), err);
return;
}

// Remove from the map and thus its no longer accessible from applier_create_req
m_repl_key_req_map.erase(rreq->rkey());

if (rreq->op_code() == journal_type_t::HS_DATA_INLINED) {
// Ensure non-volatile lsn not exist because handle_error should not be called after append entries.
auto exist_rreq = m_state_machine->lsn_to_req(rreq->lsn());
if (exist_rreq != nullptr && !exist_rreq->is_volatile()) {
HS_REL_ASSERT(false, "Unexpected: LSN={} is already ready to commit, exist_rreq=[{}]",
rreq->lsn(), exist_rreq->to_string());
}

if (rreq->op_code() == journal_type_t::HS_DATA_LINKED) {
// Free the blks which is allocated already
RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err);
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) {
Expand Down Expand Up @@ -1288,8 +1305,9 @@ nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type,
}

// Wait till we receive the data from its originator for all the requests
if (!wait_for_data_receive(*reqs, HS_DYNAMIC_CONFIG(consensus.data_receive_timeout_ms))) {
for (auto const& rreq : *reqs) {
std::vector< repl_req_ptr_t > timeout_rreqs;
if (!wait_for_data_receive(*reqs, HS_DYNAMIC_CONFIG(consensus.data_receive_timeout_ms), &timeout_rreqs)) {
for (auto const& rreq : timeout_rreqs) {
handle_error(rreq, ReplServiceError::TIMEOUT);
}
ret = nuraft::cb_func::ReturnCode::ReturnNull;
Expand Down Expand Up @@ -1480,20 +1498,22 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
RD_DBG_ASSERT(happened, "rreq already exists for rkey={}", rkey.to_string());
uint32_t data_size{0u};

// If the data is linked and value_size is non-zero, it means blks have been allocated for data.
// Since the log is flushed after data is written, the data has already been received.
if ((jentry->code == journal_type_t::HS_DATA_LINKED) && (jentry->value_size > 0)) {
MultiBlkId entry_blkid;
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */);
data_size = entry_blkid.blk_count() * get_blk_size();
rreq->set_local_blkid(entry_blkid);
rreq->add_state(repl_req_state_t::BLK_ALLOCATED);
rreq->add_state(repl_req_state_t::DATA_RECEIVED);
}

rreq->set_lsn(repl_lsn);
// keep lentry in scope for the lyfe cycle of the rreq
rreq->set_lentry(lentry);
rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size);
// we load the log from log device, implies log flushed. We only flush log after data is written to data device.
rreq->add_state(repl_req_state_t::BLK_ALLOCATED);
rreq->add_state(repl_req_state_t::DATA_RECEIVED);
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->add_state(repl_req_state_t::LOG_RECEIVED);
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
Expand Down
9 changes: 8 additions & 1 deletion src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,15 @@ class RaftReplDev : public ReplDev,
void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs);
void handle_fetch_data_response(sisl::GenericClientResponse response, std::vector< repl_req_ptr_t > rreqs);
bool is_resync_mode();

/**
* \brief This method handles errors that occur during append entries or data receiving.
* It should not be called after the append entries phase.
*/
void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err);
bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms);

bool wait_for_data_receive(std::vector < repl_req_ptr_t > const &rreqs, uint64_t timeout_ms,
std::vector < repl_req_ptr_t > *timeout_rreqs = nullptr);
void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx);
void commit_blk(repl_req_ptr_t rreq);
void replace_member(repl_req_ptr_t rreq);
Expand Down
11 changes: 7 additions & 4 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,12 @@ void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
rreq->add_state(repl_req_state_t::LOG_RECEIVED);
// reset the rreq created_at time to now https://github.com/eBay/HomeStore/issues/506
rreq->set_created_time();
[[maybe_unused]] auto r = m_lsn_req_map.insert(lsn, std::move(rreq));
RD_DBG_ASSERT_EQ(r.second, true, "lsn={} already in precommit list, exist_term={}", lsn, r.first->second->term());
auto r = m_lsn_req_map.insert(lsn, std::move(rreq));
if (!r.second) {
RD_LOG(ERROR, "lsn={} already in precommit list, exist_term={}, is_volatile={}",
lsn, r.first->second->term(), r.first->second->is_volatile());
// TODO: we need to think about the case where volatile is in the map already, is it safe to overwrite it?
}
}

repl_req_ptr_t RaftStateMachine::lsn_to_req(int64_t lsn) {
Expand Down Expand Up @@ -332,10 +336,9 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx,

// Listener will read the snapshot data and we pass through the same.
int ret = m_rd.m_listener->read_snapshot_obj(snp_ctx, snp_data);
user_ctx = snp_data->user_ctx; // Have to pass the user_ctx to NuRaft even if ret<0 to get it freed later
if (ret < 0) return ret;

// Update user_ctx and whether is_last_obj
user_ctx = snp_data->user_ctx;
is_last_obj = snp_data->is_last_obj;

// We are doing a copy here.
Expand Down
3 changes: 1 addition & 2 deletions src/tests/btree_helpers/btree_test_helper.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,7 @@ struct BtreeTestHelper {
K key = K{k};
auto sreq = BtreeSinglePutRequest{&key, &value, put_type, existing_v.get()};
sreq.enable_route_tracing();
bool done = expect_success ? (m_bt->put(sreq) == btree_status_t::success)
: m_bt->put(sreq) == btree_status_t::put_failed;
bool done = expect_success == (m_bt->put(sreq) == btree_status_t::success);

if (put_type == btree_put_type::INSERT) {
ASSERT_EQ(done, !m_shadow_map.exists(key));
Expand Down
16 changes: 10 additions & 6 deletions src/tests/test_btree_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,17 @@ struct NodeTest : public testing::Test {
K key{k};
V value{V::generate_rand()};
V existing_v;
bool done = m_node1->put(key, value, put_type, &existing_v);
btree_status_t status = m_node1->put(key, value, put_type, &existing_v);

bool expected_done{true};
if (m_shadow_map.find(key) != m_shadow_map.end()) { expected_done = (put_type != btree_put_type::INSERT); }
ASSERT_EQ(done, expected_done) << "Expected put of key " << k << " of put_type " << enum_name(put_type)
<< " to be " << expected_done;
if (expected_done) {
auto expected_status = btree_status_t::success;
if (m_shadow_map.contains(key)) {
expected_status = put_type != btree_put_type::INSERT
? btree_status_t::success
: btree_status_t::already_exists;
}
ASSERT_EQ(status, expected_status) << "Expected put of key " << k << " of put_type " << enum_name(put_type)
<< " to be " << expected_status;
if (expected_status == btree_status_t::success) {
m_shadow_map.insert(std::make_pair(key, value));
} else {
const auto r = m_shadow_map.find(key);
Expand Down

0 comments on commit c96c726

Please sign in to comment.