Skip to content

Commit

Permalink
SealShard state change in pre-commit (#120)
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 authored Apr 5, 2024
1 parent c6f7d23 commit 3b05a73
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 14 deletions.
13 changes: 5 additions & 8 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ class HSHomeObject : public HomeObjectImpl {
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);

bool on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);

/**
* @brief Retrieves the chunk number associated with the given shard ID.
*
Expand All @@ -369,14 +374,6 @@ class HSHomeObject : public HomeObjectImpl {

cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; }

//////////// Called by internal classes. These are not Public APIs ///////////////////
bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >&);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

// Blob manager related.
void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
Expand Down
89 changes: 87 additions & 2 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,90 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const
return req->result();
}

// move seal_shard to pre_commit can not fundamentally solve the conflict between seal_shard and put_blob, since
// put_blob handler will only check the shard state at the very beginning and will not check again before proposing to
// raft, so we need a callback to check whether we can handle this request before appending log, which is previous to
// pre_commit.

// FIXME after we have the callback, which is coming in homestore.

bool HSHomeObject::on_shard_message_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return false;
}
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;

{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(iter != _shard_map.end(), "Missing shard info");
auto& state = (*iter->second)->info.state;
// we just change the state to SEALED, so that it will fail the later coming put_blob on this shard and will
// be easy for rollback.
// the update of superblk will be done in on_shard_message_commit;
if (state == ShardInfo::State::OPEN) {
state = ShardInfo::State::SEALED;
} else {
LOGW("try to seal an unopen shard, shard_id: {}", shard_info.id);
}
}
}
default: {
break;
}
}
return true;
}

void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get();
}
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(header.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
{
std::scoped_lock lock_guard(_shard_lock);
auto iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(iter != _shard_map.end(), "Missing shard info");
auto& state = (*iter->second)->info.state;
// we just change the state to SEALED, since it will be easy for rollback
// the update of superblk will be done in on_shard_message_commit;
if (state == ShardInfo::State::SEALED) {
state = ShardInfo::State::OPEN;
} else {
LOGW("try to rollback seal_shard message , but the shard state is not sealed. shard_id: {}",
shard_info.id);
}
}
}
default: {
break;
}
}
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -259,12 +343,13 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
state = (*iter->second)->info.state;
}

if (state == ShardInfo::State::OPEN) {
if (state == ShardInfo::State::SEALED) {
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
chunk_selector()->release_chunk(chunk_id.value());
update_shard_in_map(shard_info);
}
} else
LOGW("try to commit SEAL_SHARD_MSG but shard state is not sealed, shard_id: {}", shard_info.id);
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
break;
}
Expand Down
35 changes: 31 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,45 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
}
}

bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_pre_commit with lsn:{}", lsn);
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("corrupted message in pre_commit, lsn:{}", lsn);
return false;
}
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
return home_object_->on_shard_message_pre_commit(lsn, header, key, ctx);
}
default: {
break;
}
}
return true;
}

void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& ctx) {
LOGI("on_rollback with lsn:{}", lsn);
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("corrupted message in rollback, lsn:{}", lsn);
return;
}
switch (msg_header->msg_type) {
case ReplicationMessageType::SEAL_SHARD_MSG: {
home_object_->on_shard_message_rollback(lsn, header, key, ctx);
break;
}
default: {
break;
}
}
}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
Expand Down

0 comments on commit 3b05a73

Please sign in to comment.