Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Dec 3, 2023
1 parent 8457aaa commit 08f639f
Showing 1 changed file with 10 additions and 7 deletions.
17 changes: 10 additions & 7 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

if (ctx) { RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "on_blob_put_pre_commit promise is already fulfilled"); }

auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
auto shard_id = msg_header->shard_id;
{
Expand All @@ -142,6 +149,7 @@ bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header,
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
if ((shard_iter->second->get()->info).state == ShardInfo::State::SEALED) {
LOGE("Shard {} is sealed when pre_commit put_blob", shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); }
// we return false here, so on_blob_put_commit will not be called.
// instead, on_blob_put_rollback will be called.
return false;
Expand All @@ -152,14 +160,11 @@ bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header,

void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}
if (!hs_ctx) return;
auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); }
// TODO: stale blks will be left, GC will take care of it.
return;
}

Expand All @@ -173,8 +178,6 @@ void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, s
}
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
repl_dev->async_free_blks(lsn, hs_ctx->get_local_blkid());

if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); }
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
Expand Down

0 comments on commit 08f639f

Please sign in to comment.