Skip to content

Commit

Permalink
Implement on_error() callback for statemachine. (#154)
Browse files Browse the repository at this point in the history
on_error() will only be called on where it proposed, i.e
is_proposer == true.  Set the error back to future
to notifiy caller.

Signed-off-by: Xiaoxi Chen <[email protected]>
  • Loading branch information
xiaoxichen authored Mar 13, 2024
1 parent 5f98b10 commit a0df650
Show file tree
Hide file tree
Showing 10 changed files with 135 additions and 17 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "1.0.9"
version = "1.0.10"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
4 changes: 2 additions & 2 deletions src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

namespace homeobject {

ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH,
READ_FAILED, INDEX_ERROR, SEALED_SHARD);
ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, UNSUPPORTED_OP, NOT_LEADER, REPLICATION_ERROR,
UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH, READ_FAILED, INDEX_ERROR, SEALED_SHARD);

struct Blob {
Blob() = default;
Expand Down
20 changes: 12 additions & 8 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

namespace homeobject {

ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, UNKNOWN_PEER, UNSUPPORTED_OP, CRC_MISMATCH,
NO_SPACE_LEFT, DRIVE_WRITE_ERROR);
ENUM(PGError, uint16_t, UNKNOWN = 1, INVALID_ARG, TIMEOUT, UNKNOWN_PG, NOT_LEADER, UNKNOWN_PEER, UNSUPPORTED_OP,
CRC_MISMATCH, NO_SPACE_LEFT, DRIVE_WRITE_ERROR);

struct PGMember {
explicit PGMember(peer_id_t _id) : id(_id) {}
Expand Down Expand Up @@ -49,21 +49,25 @@ struct PGStats {
uint32_t avail_open_shards; // total number of shards that could be opened on this PG;
uint64_t used_bytes; // total number of bytes used by all shards on this PG;
uint64_t avail_bytes; // total number of bytes available on this PG;
std::vector< std::tuple< peer_id_t, std::string, uint64_t /* last_commit_lsn */, uint64_t /* last_succ_resp_us_ */> > members;
std::vector<
std::tuple< peer_id_t, std::string, uint64_t /* last_commit_lsn */, uint64_t /* last_succ_resp_us_ */ > >
members;

std::string to_string() {
std::string members_str;
uint32_t i = 0ul;
for (auto const& m : members) {
if (i++ > 0) { members_str += ", "; };
members_str += fmt::format("member-{}: id={}, name={}, last_commit_lsn={}, last_succ_resp_us_={}", i,
boost::uuids::to_string(std::get< 0 >(m)), std::get< 1 >(m), std::get< 2 >(m), std::get< 3 >(m));
boost::uuids::to_string(std::get< 0 >(m)), std::get< 1 >(m), std::get< 2 >(m),
std::get< 3 >(m));
}

return fmt::format("PGStats: id={}, replica_set_uuid={}, leader={}, num_members={}, total_shards={}, open_shards={}, "
"avail_open_shards={}, used_bytes={}, avail_bytes={}, members: {}",
id, boost::uuids::to_string(replica_set_uuid), boost::uuids::to_string(leader_id), num_members, total_shards, open_shards,
avail_open_shards, used_bytes, avail_bytes, members_str);
return fmt::format(
"PGStats: id={}, replica_set_uuid={}, leader={}, num_members={}, total_shards={}, open_shards={}, "
"avail_open_shards={}, used_bytes={}, avail_bytes={}, members: {}",
id, boost::uuids::to_string(replica_set_uuid), boost::uuids::to_string(leader_id), num_members,
total_shards, open_shards, avail_open_shards, used_bytes, avail_bytes, members_str);
}
};

Expand Down
4 changes: 2 additions & 2 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@

namespace homeobject {

ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_PG, UNKNOWN_SHARD, PG_NOT_READY,
CRC_MISMATCH);
ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNSUPPORTED_OP, UNKNOWN_PG, UNKNOWN_SHARD,
PG_NOT_READY, CRC_MISMATCH);

struct ShardInfo {
enum class State : uint8_t {
Expand Down
35 changes: 35 additions & 0 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,41 @@ SISL_LOGGING_DECL(blobmgr)
#define BLOGC(shard_id, blob_id, msg, ...) BLOG(CRITICAL, shard_id, blob_id, msg, ##__VA_ARGS__)

namespace homeobject {

BlobError toBlobError(ReplServiceError const& e) {
switch (e) {
case ReplServiceError::BAD_REQUEST:
[[fallthrough]];
case ReplServiceError::CANCELLED:
[[fallthrough]];
case ReplServiceError::CONFIG_CHANGING:
[[fallthrough]];
case ReplServiceError::SERVER_ALREADY_EXISTS:
[[fallthrough]];
case ReplServiceError::SERVER_IS_JOINING:
[[fallthrough]];
case ReplServiceError::SERVER_IS_LEAVING:
[[fallthrough]];
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
return BlobError::REPLICATION_ERROR;
case ReplServiceError::NOT_LEADER:
return BlobError::NOT_LEADER;
case ReplServiceError::TIMEOUT:
return BlobError::TIMEOUT;
case ReplServiceError::NOT_IMPLEMENTED:
return BlobError::UNSUPPORTED_OP;
case ReplServiceError::OK:
DEBUG_ASSERT(false, "Should not process OK!");
[[fallthrough]];
case ReplServiceError::FAILED:
return BlobError::UNKNOWN;
default:
return BlobError::UNKNOWN;
}
}

struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObject::BlobInfo > > {
uint32_t blob_header_idx_{0};

Expand Down
3 changes: 3 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ using BlobIndexTable = homestore::IndexTable< BlobRouteKey, BlobRouteValue >;
class HomeObjCPContext;

static constexpr uint64_t io_align{512};
PGError toPgError(homestore::ReplServiceError const&);
BlobError toBlobError(homestore::ReplServiceError const&);
ShardError toShardError(homestore::ReplServiceError const&);

class HSHomeObject : public HomeObjectImpl {
/// NOTE: Be wary to change these as they effect on-disk format!
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ PGError toPgError(ReplServiceError const& e) {
[[fallthrough]];
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::NOT_LEADER:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
case ReplServiceError::NOT_IMPLEMENTED:
return PGError::INVALID_ARG;
case ReplServiceError::NOT_LEADER:
return PGError::NOT_LEADER;
case ReplServiceError::CANNOT_REMOVE_LEADER:
return PGError::UNKNOWN_PEER;
case ReplServiceError::TIMEOUT:
Expand Down
34 changes: 34 additions & 0 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,40 @@

namespace homeobject {

ShardError toShardError(ReplServiceError const& e) {
switch (e) {
case ReplServiceError::BAD_REQUEST:
[[fallthrough]];
case ReplServiceError::CANCELLED:
[[fallthrough]];
case ReplServiceError::CONFIG_CHANGING:
[[fallthrough]];
case ReplServiceError::SERVER_ALREADY_EXISTS:
[[fallthrough]];
case ReplServiceError::SERVER_IS_JOINING:
[[fallthrough]];
case ReplServiceError::SERVER_IS_LEAVING:
[[fallthrough]];
case ReplServiceError::RESULT_NOT_EXIST_YET:
[[fallthrough]];
case ReplServiceError::TERM_MISMATCH:
return ShardError::PG_NOT_READY;
case ReplServiceError::NOT_LEADER:
return ShardError::NOT_LEADER;
case ReplServiceError::TIMEOUT:
return ShardError::TIMEOUT;
case ReplServiceError::NOT_IMPLEMENTED:
return ShardError::UNSUPPORTED_OP;
case ReplServiceError::OK:
DEBUG_ASSERT(false, "Should not process OK!");
[[fallthrough]];
case ReplServiceError::FAILED:
return ShardError::UNKNOWN;
default:
return ShardError::UNKNOWN;
}
}

uint64_t ShardManager::max_shard_size() { return Gi; }

uint64_t ShardManager::max_shard_num_in_pg() { return ((uint64_t)0x01) << shard_width; }
Expand Down
36 changes: 34 additions & 2 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,40 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
LOGE("on_error with :{}, lsn {}", error, ctx->lsn);
// TODO:: block is already freeed at homestore side, handle error if necessay.
RELEASE_ASSERT(ctx, "ctx should not be nullptr in on_error");
RELEASE_ASSERT(ctx->is_proposer, "on_error should only be called from proposer");
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
LOGE("on_error, message type {} with lsn {}, error {}", msg_header->msg_type, ctx->lsn, error);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_PG_MSG: {
auto result_ctx = boost::static_pointer_cast< repl_result_ctx< PGManager::NullResult > >(ctx).get();
result_ctx->promise_.setValue(folly::makeUnexpected(homeobject::toPgError(error)));
break;
}
case ReplicationMessageType::CREATE_SHARD_MSG:
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto result_ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(ctx).get();
result_ctx->promise_.setValue(folly::makeUnexpected(toShardError(error)));
break;
}

case ReplicationMessageType::PUT_BLOB_MSG: {
auto result_ctx =
boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< HSHomeObject::BlobInfo > > >(ctx).get();
result_ctx->promise_.setValue(folly::makeUnexpected(toBlobError(error)));
break;
}
case ReplicationMessageType::DEL_BLOB_MSG: {
auto result_ctx =
boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< HSHomeObject::BlobInfo > > >(ctx).get();
result_ctx->promise_.setValue(folly::makeUnexpected(toBlobError(error)));
break;
}
default: {
LOGE("Unknown message type, error unhandled , error :{}, lsn {}", error, ctx->lsn);
break;
}
}
}

homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) {
Expand Down
10 changes: 10 additions & 0 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,16 @@ class HomeObjectFixture : public ::testing::Test {
homeobject::Blob clone{sisl::io_blob_safe(blob_size, alignment), user_key, 42ul};
std::memcpy(clone.body.bytes(), put_blob.body.bytes(), put_blob.body.size());
auto b = _obj_inst->blob_manager()->put(shard_id, std::move(put_blob)).get();
if (!b) {
if (b.error() == BlobError::NOT_LEADER) {
LOGINFO("Failed to put blob due to not leader, sleep 1s and continue", pg_id, shard_id);
std::this_thread::sleep_for(1s);
} else {
LOGERROR("Failed to put blob pg {} shard {}", pg_id, shard_id);
ASSERT_TRUE(false);
}
continue;
}
ASSERT_TRUE(!!b);
auto blob_id = b.value();

Expand Down

0 comments on commit a0df650

Please sign in to comment.