From a0df650e0cfb6391cd0dd6816175bcc8b284e944 Mon Sep 17 00:00:00 2001 From: Xiaoxi Chen Date: Wed, 13 Mar 2024 15:03:32 +0800 Subject: [PATCH] Implement on_error() callback for statemachine. (#154) on_error() will only be called on where it proposed, i.e is_proposer == true. Set the error back to future to notifiy caller. Signed-off-by: Xiaoxi Chen --- conanfile.py | 2 +- src/include/homeobject/blob_manager.hpp | 4 +-- src/include/homeobject/pg_manager.hpp | 20 ++++++----- src/include/homeobject/shard_manager.hpp | 4 +-- src/lib/homestore_backend/hs_blob_manager.cpp | 35 ++++++++++++++++++ src/lib/homestore_backend/hs_homeobject.hpp | 3 ++ src/lib/homestore_backend/hs_pg_manager.cpp | 4 +-- .../homestore_backend/hs_shard_manager.cpp | 34 ++++++++++++++++++ .../replication_state_machine.cpp | 36 +++++++++++++++++-- .../tests/homeobj_fixture.hpp | 10 ++++++ 10 files changed, 135 insertions(+), 17 deletions(-) diff --git a/conanfile.py b/conanfile.py index b55cf79b..1b6b60c6 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "1.0.9" + version = "1.0.10" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" topics = ("ebay") diff --git a/src/include/homeobject/blob_manager.hpp b/src/include/homeobject/blob_manager.hpp index 6a3f0fe6..18d1a402 100644 --- a/src/include/homeobject/blob_manager.hpp +++ b/src/include/homeobject/blob_manager.hpp @@ -9,8 +9,8 @@ namespace homeobject { -ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, - READ_FAILED, INDEX_ERROR, SEALED_SHARD); +ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, UNSUPPORTED_OP, NOT_LEADER, REPLICATION_ERROR, + UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD); struct Blob { Blob() = default; diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 61bb8035..22508ad8 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -10,8 +10,8 @@ namespace homeobject { -ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH, - NO_SPACE_LEFT, DRIVE_WRITE_ERROR); +ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP, + CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR); struct PGMember { explicit PGMember(peer_id_t _id) : id(_id) {} @@ -49,7 +49,9 @@ struct PGStats { uint32_t avail_open_shards; // total number of shards that could be opened on this PG; uint64_t used_bytes; // total number of bytes used by all shards on this PG; uint64_t avail_bytes; // total number of bytes available on this PG; - std::vector< std::tuple< peer_id_t, std::string, uint64_t /* last_commit_lsn */, uint64_t /* last_succ_resp_us_ */> > members; + std::vector< + std::tuple< peer_id_t, std::string, uint64_t /* last_commit_lsn */, uint64_t /* last_succ_resp_us_ */ > > + members; std::string to_string() { std::string members_str; @@ -57,13 +59,15 @@ struct PGStats { for (auto const& m : members) { if (i++ > 0) { members_str += ", "; }; members_str += fmt::format("member-{}: id={}, name={}, last_commit_lsn={}, last_succ_resp_us_={}", i, - boost::uuids::to_string(std::get< 0 >(m)), std::get< 1 >(m), std::get< 2 >(m), std::get< 3 >(m)); + boost::uuids::to_string(std::get< 0 >(m)), std::get< 1 >(m), std::get< 2 >(m), + std::get< 3 >(m)); } - return fmt::format("PGStats: id={}, replica_set_uuid={}, leader={}, num_members={}, total_shards={}, open_shards={}, " - "avail_open_shards={}, used_bytes={}, avail_bytes={}, members: {}", - id, boost::uuids::to_string(replica_set_uuid), boost::uuids::to_string(leader_id), num_members, total_shards, open_shards, - avail_open_shards, used_bytes, avail_bytes, members_str); + return fmt::format( + "PGStats: id={}, replica_set_uuid={}, leader={}, num_members={}, total_shards={}, open_shards={}, " + "avail_open_shards={}, used_bytes={}, avail_bytes={}, members: {}", + id, boost::uuids::to_string(replica_set_uuid), boost::uuids::to_string(leader_id), num_members, + total_shards, open_shards, avail_open_shards, used_bytes, avail_bytes, members_str); } }; diff --git a/src/include/homeobject/shard_manager.hpp b/src/include/homeobject/shard_manager.hpp index c929e54f..ff3401fe 100644 --- a/src/include/homeobject/shard_manager.hpp +++ b/src/include/homeobject/shard_manager.hpp @@ -9,8 +9,8 @@ namespace homeobject { -ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_PG, UNKNOWN_SHARD, PG_NOT_READY, - CRC_MISMATCH); +ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNSUPPORTED_OP, UNKNOWN_PG, UNKNOWN_SHARD, + PG_NOT_READY, CRC_MISMATCH); struct ShardInfo { enum class State : uint8_t { diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 92267e34..65649859 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -20,6 +20,41 @@ SISL_LOGGING_DECL(blobmgr) #define BLOGC(shard_id, blob_id, msg, ...) BLOG(CRITICAL, shard_id, blob_id, msg, ##__VA_ARGS__) namespace homeobject { + +BlobError toBlobError(ReplServiceError const& e) { + switch (e) { + case ReplServiceError::BAD_REQUEST: + [[fallthrough]]; + case ReplServiceError::CANCELLED: + [[fallthrough]]; + case ReplServiceError::CONFIG_CHANGING: + [[fallthrough]]; + case ReplServiceError::SERVER_ALREADY_EXISTS: + [[fallthrough]]; + case ReplServiceError::SERVER_IS_JOINING: + [[fallthrough]]; + case ReplServiceError::SERVER_IS_LEAVING: + [[fallthrough]]; + case ReplServiceError::RESULT_NOT_EXIST_YET: + [[fallthrough]]; + case ReplServiceError::TERM_MISMATCH: + return BlobError::REPLICATION_ERROR; + case ReplServiceError::NOT_LEADER: + return BlobError::NOT_LEADER; + case ReplServiceError::TIMEOUT: + return BlobError::TIMEOUT; + case ReplServiceError::NOT_IMPLEMENTED: + return BlobError::UNSUPPORTED_OP; + case ReplServiceError::OK: + DEBUG_ASSERT(false, "Should not process OK!"); + [[fallthrough]]; + case ReplServiceError::FAILED: + return BlobError::UNKNOWN; + default: + return BlobError::UNKNOWN; + } +} + struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObject::BlobInfo > > { uint32_t blob_header_idx_{0}; diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 2026aabe..ebef83f0 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -25,6 +25,9 @@ using BlobIndexTable = homestore::IndexTable< BlobRouteKey, BlobRouteValue >; class HomeObjCPContext; static constexpr uint64_t io_align{512}; +PGError toPgError(homestore::ReplServiceError const&); +BlobError toBlobError(homestore::ReplServiceError const&); +ShardError toShardError(homestore::ReplServiceError const&); class HSHomeObject : public HomeObjectImpl { /// NOTE: Be wary to change these as they effect on-disk format! diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 3e7b4f18..e5d714a1 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -23,11 +23,11 @@ PGError toPgError(ReplServiceError const& e) { [[fallthrough]]; case ReplServiceError::RESULT_NOT_EXIST_YET: [[fallthrough]]; - case ReplServiceError::NOT_LEADER: - [[fallthrough]]; case ReplServiceError::TERM_MISMATCH: case ReplServiceError::NOT_IMPLEMENTED: return PGError::INVALID_ARG; + case ReplServiceError::NOT_LEADER: + return PGError::NOT_LEADER; case ReplServiceError::CANNOT_REMOVE_LEADER: return PGError::UNKNOWN_PEER; case ReplServiceError::TIMEOUT: diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 3004fbc1..0223f07b 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -10,6 +10,40 @@ namespace homeobject { +ShardError toShardError(ReplServiceError const& e) { + switch (e) { + case ReplServiceError::BAD_REQUEST: + [[fallthrough]]; + case ReplServiceError::CANCELLED: + [[fallthrough]]; + case ReplServiceError::CONFIG_CHANGING: + [[fallthrough]]; + case ReplServiceError::SERVER_ALREADY_EXISTS: + [[fallthrough]]; + case ReplServiceError::SERVER_IS_JOINING: + [[fallthrough]]; + case ReplServiceError::SERVER_IS_LEAVING: + [[fallthrough]]; + case ReplServiceError::RESULT_NOT_EXIST_YET: + [[fallthrough]]; + case ReplServiceError::TERM_MISMATCH: + return ShardError::PG_NOT_READY; + case ReplServiceError::NOT_LEADER: + return ShardError::NOT_LEADER; + case ReplServiceError::TIMEOUT: + return ShardError::TIMEOUT; + case ReplServiceError::NOT_IMPLEMENTED: + return ShardError::UNSUPPORTED_OP; + case ReplServiceError::OK: + DEBUG_ASSERT(false, "Should not process OK!"); + [[fallthrough]]; + case ReplServiceError::FAILED: + return ShardError::UNKNOWN; + default: + return ShardError::UNKNOWN; + } +} + uint64_t ShardManager::max_shard_size() { return Gi; } uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; } diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 8331eb6a..90fd4aec 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -46,8 +46,40 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl:: void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key, cintrusive< repl_req_ctx >& ctx) { - LOGE("on_error with :{}, lsn {}", error, ctx->lsn); - // TODO:: block is already freeed at homestore side, handle error if necessay. + RELEASE_ASSERT(ctx, "ctx should not be nullptr in on_error"); + RELEASE_ASSERT(ctx->is_proposer, "on_error should only be called from proposer"); + const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); + LOGE("on_error, message type {} with lsn {}, error {}", msg_header->msg_type, ctx->lsn, error); + switch (msg_header->msg_type) { + case ReplicationMessageType::CREATE_PG_MSG: { + auto result_ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(ctx).get(); + result_ctx->promise_.setValue(folly::makeUnexpected(homeobject::toPgError(error))); + break; + } + case ReplicationMessageType::CREATE_SHARD_MSG: + case ReplicationMessageType::SEAL_SHARD_MSG: { + auto result_ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(ctx).get(); + result_ctx->promise_.setValue(folly::makeUnexpected(toShardError(error))); + break; + } + + case ReplicationMessageType::PUT_BLOB_MSG: { + auto result_ctx = + boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< HSHomeObject::BlobInfo > > >(ctx).get(); + result_ctx->promise_.setValue(folly::makeUnexpected(toBlobError(error))); + break; + } + case ReplicationMessageType::DEL_BLOB_MSG: { + auto result_ctx = + boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< HSHomeObject::BlobInfo > > >(ctx).get(); + result_ctx->promise_.setValue(folly::makeUnexpected(toBlobError(error))); + break; + } + default: { + LOGE("Unknown message type, error unhandled , error :{}, lsn {}", error, ctx->lsn); + break; + } + } } homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) { diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index e893544c..42e0c7a7 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -91,6 +91,16 @@ class HomeObjectFixture : public ::testing::Test { homeobject::Blob clone{sisl::io_blob_safe(blob_size, alignment), user_key, 42ul}; std::memcpy(clone.body.bytes(), put_blob.body.bytes(), put_blob.body.size()); auto b = _obj_inst->blob_manager()->put(shard_id, std::move(put_blob)).get(); + if (!b) { + if (b.error() == BlobError::NOT_LEADER) { + LOGINFO("Failed to put blob due to not leader, sleep 1s and continue", pg_id, shard_id); + std::this_thread::sleep_for(1s); + } else { + LOGERROR("Failed to put blob pg {} shard {}", pg_id, shard_id); + ASSERT_TRUE(false); + } + continue; + } ASSERT_TRUE(!!b); auto blob_id = b.value();