diff --git a/src/include/homestore/btree/detail/btree_internal.hpp b/src/include/homestore/btree/detail/btree_internal.hpp index 8989a2d5d..14533a8e5 100644 --- a/src/include/homestore/btree/detail/btree_internal.hpp +++ b/src/include/homestore/btree/detail/btree_internal.hpp @@ -201,8 +201,8 @@ VENUM(btree_node_type, uint32_t, FIXED = 0, VAR_VALUE = 1, VAR_KEY = 2, VAR_OBJE VENUM(btree_store_type, uint8_t, MEM = 0, SSD = 1) #endif -ENUM(btree_status_t, uint32_t, success, not_found, retry, has_more, node_read_failed, put_failed, space_not_avail, - cp_mismatch, merge_not_required, merge_failed, crc_mismatch, not_supported, node_freed) +ENUM(btree_status_t, uint32_t, success, not_found, retry, has_more, node_read_failed, already_exists, filtered_out, + space_not_avail, cp_mismatch, merge_not_required, merge_failed, crc_mismatch, not_supported, node_freed) /*ENUM(btree_node_write_type, uint8_t, new_node, // Node write whenever a new node is created. diff --git a/src/include/homestore/btree/detail/btree_mutate_impl.ipp b/src/include/homestore/btree/detail/btree_mutate_impl.ipp index 3e90ccfd5..3cfc19a18 100644 --- a/src/include/homestore/btree/detail/btree_mutate_impl.ipp +++ b/src/include/homestore/btree/detail/btree_mutate_impl.ipp @@ -56,7 +56,7 @@ retry: const auto matched = my_node->match_range(req.working_range(), start_idx, end_idx); if (!matched) { BT_NODE_LOG_ASSERT(false, my_node, "match_range returns 0 entries for interior node is not valid pattern"); - ret = btree_status_t::put_failed; + ret = btree_status_t::not_found; goto out; } } else if constexpr (std::is_same_v< ReqT, BtreeSinglePutRequest >) { @@ -182,10 +182,8 @@ btree_status_t Btree< K, V >::mutate_write_leaf_node(const BtreeNodePtr& my_node req.shift_working_range(); } } else if constexpr (std::is_same_v< ReqT, BtreeSinglePutRequest >) { - if (!to_variant_node(my_node)->put(req.key(), req.value(), req.m_put_type, req.m_existing_val, - req.m_filter_cb)) { - ret = btree_status_t::put_failed; - } + ret = to_variant_node(my_node)->put(req.key(), req.value(), req.m_put_type, req.m_existing_val, + req.m_filter_cb); COUNTER_INCREMENT(m_metrics, btree_obj_count, 1); } diff --git a/src/include/homestore/btree/detail/variant_node.hpp b/src/include/homestore/btree/detail/variant_node.hpp index 0814f6187..004313ce1 100644 --- a/src/include/homestore/btree/detail/variant_node.hpp +++ b/src/include/homestore/btree/detail/variant_node.hpp @@ -192,14 +192,14 @@ class VariantNode : public StoreSpecificBtreeNode { /// translates into one of "Insert", "Update" or "Upsert". /// @param existing_val [optional] A pointer to a value to store the value of the existing entry if it was updated. /// @param filter_cb [optional] A callback function to be called for each entry found in the node that has a key. It - /// is used as an filter to remove anything that needn't be updated. - /// @return A boolean indicating whether the operation was successful. + /// is used as a filter to remove anything that needn't be updated. + /// @return A status code indicating whether the operation was successful. /// - virtual bool put(BtreeKey const& key, BtreeValue const& val, btree_put_type put_type, BtreeValue* existing_val, - put_filter_cb_t const& filter_cb = nullptr) { + virtual btree_status_t put(BtreeKey const &key, BtreeValue const &val, btree_put_type put_type, + BtreeValue *existing_val, put_filter_cb_t const &filter_cb = nullptr) { LOGMSG_ASSERT_EQ(magic(), BTREE_NODE_MAGIC, "Magic mismatch on btree_node {}", get_persistent_header_const()->to_string()); - bool ret = true; + auto ret = btree_status_t::success; DEBUG_ASSERT_EQ( this->is_leaf(), true, @@ -210,22 +210,26 @@ class VariantNode : public StoreSpecificBtreeNode { if (existing_val) { get_nth_value(idx, existing_val, true); } if (filter_cb && filter_cb(get_nth_key< K >(idx, false), get_nth_value(idx, false), val) != - put_filter_decision::replace) { - return false; + put_filter_decision::replace) { + LOGINFO("Filter callback rejected the update for key {}", key.to_string()); + return btree_status_t::filtered_out; } } if (put_type == btree_put_type::INSERT) { if (found) { - LOGDEBUG("Attempt to insert duplicate entry {}", key.to_string()); - return false; + LOGINFO("Attempt to insert duplicate entry {}", key.to_string()); + return btree_status_t::already_exists; } - ret = (insert(idx, key, val) == btree_status_t::success); + ret = insert(idx, key, val); } else if (put_type == btree_put_type::UPDATE) { - if (!found) return false; + if (!found) { + LOGINFO("Attempt to update non-existent entry {}", key.to_string()); + return btree_status_t::not_found; + } update(idx, key, val); } else if (put_type == btree_put_type::UPSERT) { - (found) ? update(idx, key, val) : (void)insert(idx, key, val); + found ? update(idx, key, val) : (void) insert(idx, key, val); } else { DEBUG_ASSERT(false, "Wrong put_type {}", put_type); } diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index ec8344be0..db79b5f9c 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -152,6 +152,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: int64_t lsn() const { return m_lsn; } bool is_proposer() const { return m_is_proposer; } journal_type_t op_code() const { return m_op_code; } + bool is_volatile() const { return m_is_volatile.load(); } sisl::blob const& header() const { return m_header; } sisl::blob const& key() const { return m_key; } @@ -222,6 +223,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; } void set_local_blkid(MultiBlkId const& lbid) { m_local_blkid = lbid; } // Only used during recovery + void set_is_volatile(bool is_volatile) { m_is_volatile.store(is_volatile); } void set_lsn(int64_t lsn); void add_state(repl_req_state_t s); bool add_state_if_not_already(repl_req_state_t s); @@ -248,6 +250,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: bool m_is_proposer{false}; // Is the repl_req proposed by this node Clock::time_point m_start_time; // Start time of the request journal_type_t m_op_code{journal_type_t::HS_DATA_INLINED}; // Operation code for this request + std::atomic< bool > m_is_volatile{true}; // Is the log still in memory and not flushed to disk yet /////////////// Data related section ///////////////// MultiBlkId m_local_blkid; // Local BlkId for the data diff --git a/src/lib/replication/log_store/repl_log_store.cpp b/src/lib/replication/log_store/repl_log_store.cpp index 36cec9370..97d70ff92 100644 --- a/src/lib/replication/log_store/repl_log_store.cpp +++ b/src/lib/replication/log_store/repl_log_store.cpp @@ -93,6 +93,19 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) { if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); } } } + + // Convert volatile logs to non-volatile logs in state machine + for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) { + auto rreq = m_sm.lsn_to_req(lsn); + if (rreq != nullptr) { + if (rreq->has_state(repl_req_state_t::ERRORED)) { + RD_LOGE("Raft Channel: rreq=[{}] met some errors before", rreq->to_compact_string()); + continue; + } + rreq->set_is_volatile(false); + } + } + sisl::VectorPool< repl_req_ptr_t >::free(reqs); sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 2449f7833..1270ed761 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -594,7 +594,8 @@ folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< }); } -bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms) { +bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms, + std::vector< repl_req_ptr_t >* timeout_rreqs) { std::vector< folly::Future< folly::Unit > > futs; std::vector< repl_req_ptr_t > only_wait_reqs; only_wait_reqs.reserve(rreqs.size()); @@ -621,14 +622,23 @@ bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rre // We are yet to support reactive fetch from remote. if (is_resync_mode()) { - check_and_fetch_remote_data(std::move(only_wait_reqs)); + check_and_fetch_remote_data(only_wait_reqs); } else { - m_repl_svc.add_to_fetch_queue(shared_from_this(), std::move(only_wait_reqs)); + m_repl_svc.add_to_fetch_queue(shared_from_this(), only_wait_reqs); } // block waiting here until all the futs are ready (data channel filled in and promises are made); - auto all_futs = folly::collectAllUnsafe(futs).wait(std::chrono::milliseconds(timeout_ms)); - return (all_futs.isReady()); + auto all_futs_ready = folly::collectAllUnsafe(futs).wait(std::chrono::milliseconds(timeout_ms)).isReady(); + if (!all_futs_ready && timeout_rreqs != nullptr) { + timeout_rreqs->clear(); + for (size_t i{0}; i < futs.size(); ++i) { + if (!futs[i].isReady()) { + timeout_rreqs->emplace_back(only_wait_reqs[i]); + } + } + all_futs_ready = timeout_rreqs->empty(); + } + return all_futs_ready; } void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreqs) { @@ -953,18 +963,25 @@ void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) { void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) { if (err == ReplServiceError::OK) { return; } + RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err); if (!rreq->add_state_if_not_already(repl_req_state_t::ERRORED)) { - RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err); + RD_LOGE("Raft Channel: Error has been added for rreq=[{}] error={}", rreq->to_string(), err); return; } // Remove from the map and thus its no longer accessible from applier_create_req m_repl_key_req_map.erase(rreq->rkey()); - if (rreq->op_code() == journal_type_t::HS_DATA_INLINED) { + // Ensure non-volatile lsn not exist because handle_error should not be called after append entries. + auto exist_rreq = m_state_machine->lsn_to_req(rreq->lsn()); + if (exist_rreq != nullptr && !exist_rreq->is_volatile()) { + HS_REL_ASSERT(false, "Unexpected: LSN={} is already ready to commit, exist_rreq=[{}]", + rreq->lsn(), exist_rreq->to_string()); + } + + if (rreq->op_code() == journal_type_t::HS_DATA_LINKED) { // Free the blks which is allocated already - RD_LOGE("Raft Channel: Error in processing rreq=[{}] error={}", rreq->to_string(), err); if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) { auto blkid = rreq->local_blkid(); data_service().async_free_blk(blkid).thenValue([blkid](auto&& err) { @@ -1288,8 +1305,9 @@ nuraft::cb_func::ReturnCode RaftReplDev::raft_event(nuraft::cb_func::Type type, } // Wait till we receive the data from its originator for all the requests - if (!wait_for_data_receive(*reqs, HS_DYNAMIC_CONFIG(consensus.data_receive_timeout_ms))) { - for (auto const& rreq : *reqs) { + std::vector< repl_req_ptr_t > timeout_rreqs; + if (!wait_for_data_receive(*reqs, HS_DYNAMIC_CONFIG(consensus.data_receive_timeout_ms), &timeout_rreqs)) { + for (auto const& rreq : timeout_rreqs) { handle_error(rreq, ReplServiceError::TIMEOUT); } ret = nuraft::cb_func::ReturnCode::ReturnNull; @@ -1480,11 +1498,15 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx RD_DBG_ASSERT(happened, "rreq already exists for rkey={}", rkey.to_string()); uint32_t data_size{0u}; + // If the data is linked and value_size is non-zero, it means blks have been allocated for data. + // Since the log is flushed after data is written, the data has already been received. if ((jentry->code == journal_type_t::HS_DATA_LINKED) && (jentry->value_size > 0)) { MultiBlkId entry_blkid; entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); data_size = entry_blkid.blk_count() * get_blk_size(); rreq->set_local_blkid(entry_blkid); + rreq->add_state(repl_req_state_t::BLK_ALLOCATED); + rreq->add_state(repl_req_state_t::DATA_RECEIVED); } rreq->set_lsn(repl_lsn); @@ -1492,8 +1514,6 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx rreq->set_lentry(lentry); rreq->init(rkey, jentry->code, false /* is_proposer */, entry_to_hdr(jentry), entry_to_key(jentry), data_size); // we load the log from log device, implies log flushed. We only flush log after data is written to data device. - rreq->add_state(repl_req_state_t::BLK_ALLOCATED); - rreq->add_state(repl_req_state_t::DATA_RECEIVED); rreq->add_state(repl_req_state_t::DATA_WRITTEN); rreq->add_state(repl_req_state_t::LOG_RECEIVED); rreq->add_state(repl_req_state_t::LOG_FLUSHED); diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index e9ec2a1ad..28706f716 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -306,8 +306,15 @@ class RaftReplDev : public ReplDev, void fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs); void handle_fetch_data_response(sisl::GenericClientResponse response, std::vector< repl_req_ptr_t > rreqs); bool is_resync_mode(); + + /** + * \brief This method handles errors that occur during append entries or data receiving. + * It should not be called after the append entries phase. + */ void handle_error(repl_req_ptr_t const& rreq, ReplServiceError err); - bool wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms); + + bool wait_for_data_receive(std::vector < repl_req_ptr_t > const &rreqs, uint64_t timeout_ms, + std::vector < repl_req_ptr_t > *timeout_rreqs = nullptr); void on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx); void commit_blk(repl_req_ptr_t rreq); void replace_member(repl_req_ptr_t rreq); diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 09bd6b7ba..10fb9285f 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -293,8 +293,12 @@ void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) { rreq->add_state(repl_req_state_t::LOG_RECEIVED); // reset the rreq created_at time to now https://github.com/eBay/HomeStore/issues/506 rreq->set_created_time(); - [[maybe_unused]] auto r = m_lsn_req_map.insert(lsn, std::move(rreq)); - RD_DBG_ASSERT_EQ(r.second, true, "lsn={} already in precommit list, exist_term={}", lsn, r.first->second->term()); + auto r = m_lsn_req_map.insert(lsn, std::move(rreq)); + if (!r.second) { + RD_LOG(ERROR, "lsn={} already in precommit list, exist_term={}, is_volatile={}", + lsn, r.first->second->term(), r.first->second->is_volatile()); + // TODO: we need to think about the case where volatile is in the map already, is it safe to overwrite it? + } } repl_req_ptr_t RaftStateMachine::lsn_to_req(int64_t lsn) { @@ -332,10 +336,9 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, // Listener will read the snapshot data and we pass through the same. int ret = m_rd.m_listener->read_snapshot_obj(snp_ctx, snp_data); + user_ctx = snp_data->user_ctx; // Have to pass the user_ctx to NuRaft even if ret<0 to get it freed later if (ret < 0) return ret; - // Update user_ctx and whether is_last_obj - user_ctx = snp_data->user_ctx; is_last_obj = snp_data->is_last_obj; // We are doing a copy here. diff --git a/src/tests/btree_helpers/btree_test_helper.hpp b/src/tests/btree_helpers/btree_test_helper.hpp index a047fed23..1480f5358 100644 --- a/src/tests/btree_helpers/btree_test_helper.hpp +++ b/src/tests/btree_helpers/btree_test_helper.hpp @@ -439,8 +439,7 @@ struct BtreeTestHelper { K key = K{k}; auto sreq = BtreeSinglePutRequest{&key, &value, put_type, existing_v.get()}; sreq.enable_route_tracing(); - bool done = expect_success ? (m_bt->put(sreq) == btree_status_t::success) - : m_bt->put(sreq) == btree_status_t::put_failed; + bool done = expect_success == (m_bt->put(sreq) == btree_status_t::success); if (put_type == btree_put_type::INSERT) { ASSERT_EQ(done, !m_shadow_map.exists(key)); diff --git a/src/tests/test_btree_node.cpp b/src/tests/test_btree_node.cpp index 4eb775572..2b1a02e71 100644 --- a/src/tests/test_btree_node.cpp +++ b/src/tests/test_btree_node.cpp @@ -88,13 +88,17 @@ struct NodeTest : public testing::Test { K key{k}; V value{V::generate_rand()}; V existing_v; - bool done = m_node1->put(key, value, put_type, &existing_v); + btree_status_t status = m_node1->put(key, value, put_type, &existing_v); - bool expected_done{true}; - if (m_shadow_map.find(key) != m_shadow_map.end()) { expected_done = (put_type != btree_put_type::INSERT); } - ASSERT_EQ(done, expected_done) << "Expected put of key " << k << " of put_type " << enum_name(put_type) - << " to be " << expected_done; - if (expected_done) { + auto expected_status = btree_status_t::success; + if (m_shadow_map.contains(key)) { + expected_status = put_type != btree_put_type::INSERT + ? btree_status_t::success + : btree_status_t::already_exists; + } + ASSERT_EQ(status, expected_status) << "Expected put of key " << k << " of put_type " << enum_name(put_type) + << " to be " << expected_status; + if (expected_status == btree_status_t::success) { m_shadow_map.insert(std::make_pair(key, value)); } else { const auto r = m_shadow_map.find(key);