From f3a4ed919175d6e1f5ca4a903c8495b852516351 Mon Sep 17 00:00:00 2001 From: Jie Yao Date: Thu, 18 Jan 2024 04:52:46 -0700 Subject: [PATCH] SealShard state change in pre-commit --- .../homestore_backend/heap_chunk_selector.cpp | 8 +- .../homestore_backend/heap_chunk_selector.h | 1 + src/lib/homestore_backend/hs_blob_manager.cpp | 65 +++++++++- src/lib/homestore_backend/hs_homeobject.cpp | 8 +- src/lib/homestore_backend/hs_homeobject.hpp | 23 +++- .../homestore_backend/hs_shard_manager.cpp | 122 ++++++++++++++++-- .../replication_state_machine.cpp | 46 +++++-- .../homestore_backend/tests/hs_blob_tests.cpp | 45 +++++++ 8 files changed, 290 insertions(+), 28 deletions(-) diff --git a/src/lib/homestore_backend/heap_chunk_selector.cpp b/src/lib/homestore_backend/heap_chunk_selector.cpp index 897ab2ad..84e4c6e3 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.cpp +++ b/src/lib/homestore_backend/heap_chunk_selector.cpp @@ -17,7 +17,11 @@ namespace homeobject { // 2 the key collection of m_chunks will never change // this should only be called when initializing HeapChunkSelector in Homestore -void HeapChunkSelector::add_chunk(csharedChunk& chunk) { m_chunks.emplace(VChunk(chunk).get_chunk_id(), chunk); } +void HeapChunkSelector::add_chunk(csharedChunk& chunk) { + auto chunk_id = VChunk(chunk).get_chunk_id(); + m_chunks.emplace(chunk_id, chunk); + m_selected_chunks.emplace(chunk_id); +} void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID, bool add_to_heap) { if (m_chunks.find(chunkID) == m_chunks.end()) { @@ -45,6 +49,7 @@ void HeapChunkSelector::add_chunk_internal(const chunk_num_t chunkID, bool add_t { std::lock_guard< std::mutex > l(m_defrag_mtx); m_defrag_heap.emplace(chunk); + m_selected_chunks.erase(chunkID); } std::lock_guard< std::mutex > l(heapLock); heap.emplace(chunk); @@ -178,6 +183,7 @@ void HeapChunkSelector::remove_chunk_from_defrag_heap(const chunk_num_t chunkID) for (auto& c : chunks) { m_defrag_heap.emplace(c); } + m_selected_chunks.emplace(chunkID); } void HeapChunkSelector::foreach_chunks(std::function< void(csharedChunk&) >&& cb) { diff --git a/src/lib/homestore_backend/heap_chunk_selector.h b/src/lib/homestore_backend/heap_chunk_selector.h index 1ccf5d15..c2ca7cce 100644 --- a/src/lib/homestore_backend/heap_chunk_selector.h +++ b/src/lib/homestore_backend/heap_chunk_selector.h @@ -121,6 +121,7 @@ class HeapChunkSelector : public homestore::ChunkSelector { void add_chunk_internal(const chunk_num_t, bool add_to_heap = true); VChunkDefragHeap m_defrag_heap; + std::unordered_set< chunk_num_t > m_selected_chunks; std::mutex m_defrag_mtx; void remove_chunk_from_defrag_heap(const chunk_num_t); diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 0b37f32f..7f01bafa 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -141,12 +141,73 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s }); } +bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); + } + + if (ctx) { RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "on_blob_put_pre_commit promise is already fulfilled"); } + + auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); + auto shard_id = msg_header->shard_id; + +// used for test +#ifndef NDEBUG + if (smphSignal) smphSignal->acquire(); +#endif + + { + std::scoped_lock lock_guard(_shard_lock); + auto shard_iter = _shard_map.find(shard_id); + RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info"); + if ((shard_iter->second->get()->info).state == ShardInfo::State::SEALED) { + LOGE("Shard {} is sealed when pre_commit put_blob", shard_id); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); } + // we return false here, so on_blob_put_commit will not be called. + // instead, on_blob_put_rollback will be called. + + // TODO: solo_repl_dev not check the returen value of pre_commit. this logic should be added to + // solo_repl_dev,if we get a false from pre_commit, we should call on_blob_put_rollback. + return false; + } + } + return true; +} + +void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + if (!hs_ctx) return; + auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); + if (msg_header->corrupted()) { + LOGE("replication message header is corrupted with crc error, lsn:{}", lsn); + // TODO: stale blks will be left, GC will take care of it. + return; + } + + auto& pg_id = msg_header->pg_id; + shared< homestore::ReplDev > repl_dev; + { + std::shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(pg_id); + RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); + repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_; + } + RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null"); + repl_dev->async_free_blks(lsn, hs_ctx->get_local_blkid()); +} + void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; if (hs_ctx != nullptr) { ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); + if (ctx->promise_.isFulfilled()) { + LOGE("on_blob_put_commit promise is already fulfilled in pre_commit"); + return; + } } auto msg_header = r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); @@ -392,9 +453,7 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis } auto& multiBlks = r.value(); - if (multiBlks != tombstone_pbas) { - repl_dev->async_free_blks(lsn, multiBlks); - } + if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); } if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } } diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 24162700..c2901f01 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -178,7 +178,13 @@ void HSHomeObject::init_cp() { std::move(std::make_unique< HomeObjCPCallbacks >(this))); } -// void HSHomeObject::trigger_timed_events() { persist_pg_sb(); } +#ifndef NDEBUG +void HSHomeObject::set_semaphore() { smphSignal = std::make_shared< std::binary_semaphore >(0); } + +void HSHomeObject::release_semaphore() { + if (smphSignal) smphSignal->release(); +} +#endif void HSHomeObject::register_homestore_metablk_callback() { // register some callbacks for metadata recovery; diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index d04fac5d..4286510a 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -2,6 +2,7 @@ #include #include +#include #include #include @@ -216,6 +217,10 @@ class HSHomeObject : public HomeObjectImpl { shared< HeapChunkSelector > chunk_selector_; bool recovery_done_{false}; + // used for testing when we only have solo_repl_dev. + // TODO: remove or change this when we have raft_repl_dev. + shared< std::binary_semaphore > smphSignal; + private: static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); } @@ -229,6 +234,10 @@ class HSHomeObject : public HomeObjectImpl { void update_shard_in_map(const ShardInfo& shard_info); void do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids, sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx); + bool do_shard_message_pre_commit(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value, + cintrusive< homestore::repl_req_ctx >& hs_ctx); + void do_shard_message_rollback(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value, + cintrusive< homestore::repl_req_ctx >& hs_ctx); // recover part void register_homestore_metablk_callback(); void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie); @@ -291,15 +300,19 @@ class HSHomeObject : public HomeObjectImpl { cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; } bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - cintrusive< homestore::repl_req_ctx >&); + cintrusive< homestore::repl_req_ctx >& hs_ctx); void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, - cintrusive< homestore::repl_req_ctx >&); + cintrusive< homestore::repl_req_ctx >& hs_ctx); void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); // Blob manager related. void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx); + bool on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx); + void on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx); void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); homestore::blk_alloc_hints blob_put_get_blk_alloc_hints(sisl::blob const& header, @@ -321,7 +334,11 @@ class HSHomeObject : public HomeObjectImpl { const BlobInfo& blob_info); void print_btree_index(pg_id_t pg_id); - // void trigger_timed_events(); + // used for testing when we only have solo_repl_dev. +#ifndef NDEBUG + void set_semaphore(); + void release_semaphore(); +#endif }; class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks { diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index c1d3c4fd..a078a674 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -142,6 +142,100 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const return req->result(); } +bool HSHomeObject::on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + if (hs_ctx != nullptr) { + auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx); + return do_shard_message_pre_commit( + lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), ctx->hdr_buf_, hs_ctx); + } + return true; +} + +bool HSHomeObject::do_shard_message_pre_commit(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; + if (hs_ctx != nullptr) { + ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); + } + + if (ctx) { + RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "do_shard_message_pre_commit promise is already fulfilled"); + } + + if (header.corrupted()) { + LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + return false; + } + + if (crc32_ieee(init_crc32, r_cast< const unsigned char* >(value.bytes()), value.size()) != header.payload_crc) { + // header & value is inconsistent; + LOGW("replication message header is inconsistent with value, lsn:{}", lsn); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + return false; + } + + auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes()), value.size()); + switch (header.msg_type) { + case ReplicationMessageType::SEAL_SHARD_MSG: { + // we can not release chunk here, since if rollback happens, we can not make sure we can get the same chunk. + // chunk selector will always return the a chunk of least used, and GC will happen at the time window + // we can not make sure we can get the same chunk as before, so we need to wait for the commit phase to release + // chunk; + update_shard_in_map(shard_info); + break; + } + default: { + break; + } + } + + return true; +} + +void HSHomeObject::on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& hs_ctx) { + if (hs_ctx != nullptr) { + auto ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx); + do_shard_message_rollback(lsn, *r_cast< ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())), + ctx->hdr_buf_, hs_ctx); + } +} + +void HSHomeObject::do_shard_message_rollback(int64_t lsn, ReplicationMessageHeader& header, sisl::blob value, + [[maybe_unused]] cintrusive< homestore::repl_req_ctx >& hs_ctx) { + if (header.corrupted()) { + LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); + return; + } + + if (crc32_ieee(init_crc32, r_cast< const unsigned char* >(value.bytes()), value.size()) != header.payload_crc) { + // header & value is inconsistent; + LOGW("replication message header is inconsistent with value, lsn:{}", lsn); + return; + } + + // TODO:: what if we pre_commit successfully , but rollback failed(e.g., log header corrupted) ? + // seem we just crash here if that happens + + auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes()), value.size()); + switch (header.msg_type) { + case ReplicationMessageType::SEAL_SHARD_MSG: { + shard_info.state = ShardInfo::State::OPEN; + update_shard_in_map(shard_info); + break; + } + default: { + break; + } + } + + // promise should not be setvalue in roll back + // 1 if the actual execution happens in pre_commit, the promise will be setvalue in pre_commit + // 2 if the actual execution happens in commit, the promise will be setvalue in commit +} + void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids, homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { @@ -180,24 +274,30 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader homestore::MultiBlkId const& blkids, sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr}; + + // for create_shard, we need to fulfill the promise; + // for seal_shard, the promise is already fulfilled in pre_commit; + bool ctxFullfilled{true}; if (hs_ctx != nullptr) { ctx = boost::static_pointer_cast< repl_result_ctx< ShardManager::Result< ShardInfo > > >(hs_ctx).get(); + ctxFullfilled = ctx->promise_.isFulfilled(); } if (header.corrupted()) { LOGW("replication message header is corrupted with crc error, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + if (!ctxFullfilled) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } return; } if (crc32_ieee(init_crc32, value.cbytes(), value.size()) != header.payload_crc) { // header & value is inconsistent; LOGW("replication message header is inconsistent with value, lsn:{}", lsn); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } + if (!ctxFullfilled) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); } return; } - auto shard_info = deserialize_shard_info(r_cast< const char* >(value.cbytes()), value.size()); + // TODO:: for seal_shard, what if we pre_commit successfully , but commit failed(e.g., log header corrupted) ? + auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes()), value.size()); switch (header.msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { bool shard_exist = false; @@ -215,7 +315,6 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader break; } - case ReplicationMessageType::SEAL_SHARD_MSG: { ShardInfo::State state; { @@ -225,21 +324,20 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader state = (*iter->second)->info.state; } - if (state == ShardInfo::State::OPEN) { - auto chunk_id = get_shard_chunk(shard_info.id); - RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found"); - chunk_selector()->release_chunk(chunk_id.value()); - update_shard_in_map(shard_info); - } + RELEASE_ASSERT(state == ShardInfo::State::SEALED, "Shard should be sealed before commit"); - break; + auto chunk_id = get_shard_chunk(shard_info.id); + RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found"); + // when restarting, the chunk is not selected since the metablk indicates the shard is sealed. + // handle this in chunk selector + chunk_selector()->release_chunk(chunk_id.value()); } default: { break; } } - if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } + if (!ctxFullfilled) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } } void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) { diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index af671f10..05f95a2a 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -26,18 +26,48 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c } } -bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&, - cintrusive< homestore::repl_req_ctx >&) { +bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& ctx) { LOGI("on_pre_commit with lsn:{}", lsn); - // For shard creation, since homestore repldev inside will write shard header to data service first before this - // function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard - // from the blkid in on_commit() - return true; + bool ret{true}; + const ReplicationMessageHeader* msg_header = + r_cast< const ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); + switch (msg_header->msg_type) { + case ReplicationMessageType::CREATE_SHARD_MSG: + case ReplicationMessageType::SEAL_SHARD_MSG: { + ret = home_object_->on_pre_commit_shard_msg(lsn, header, key, ctx); + break; + } + case ReplicationMessageType::PUT_BLOB_MSG: { + ret = home_object_->on_blob_put_pre_commit(lsn, header, key, ctx); + break; + } + default: { + break; + } + } + return ret; } -void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const&, sisl::blob const&, - cintrusive< homestore::repl_req_ctx >&) { +void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key, + cintrusive< homestore::repl_req_ctx >& ctx) { LOGI("on_rollback with lsn:{}", lsn); + const ReplicationMessageHeader* msg_header = + r_cast< const ReplicationMessageHeader* >(const_cast< uint8_t* >(header.cbytes())); + switch (msg_header->msg_type) { + case ReplicationMessageType::CREATE_SHARD_MSG: + case ReplicationMessageType::SEAL_SHARD_MSG: { + home_object_->on_rollback_shard_msg(lsn, header, key, ctx); + break; + } + case ReplicationMessageType::PUT_BLOB_MSG: { + home_object_->on_blob_put_rollback(lsn, header, key, ctx); + break; + } + default: { + break; + } + } } homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) { diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index e6f7b8bd..7c4999b7 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -217,3 +217,48 @@ TEST_F(HomeObjectFixture, PGStatsTest) { auto stats = _obj_inst->get_stats(); LOGINFO("HomeObj stats: {}", stats.to_string()); } + +#ifndef NDEBUG +TEST_F(HomeObjectFixture, SealShardWithPutBlob) { + pg_id_t pg_id{1}; + create_pg(pg_id); + + auto s = _obj_inst->shard_manager()->create_shard(pg_id, 64 * Mi).get(); + ASSERT_TRUE(!!s); + auto shard_info = s.value(); + auto shard_id = shard_info.id; + s = _obj_inst->shard_manager()->get_shard(shard_id).get(); + ASSERT_TRUE(!!s); + + // normal put blob should succeed + LOGINFO("Got shard {}", shard_id); + shard_info = s.value(); + EXPECT_EQ(shard_info.id, shard_id); + EXPECT_EQ(shard_info.placement_group, pg_id); + EXPECT_EQ(shard_info.state, ShardInfo::State::OPEN); + auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get(); + ASSERT_TRUE(!!b); + LOGINFO("Put blob {}", b.value()); + + auto hs_homeobject = dynamic_cast< HSHomeObject* >(_obj_inst.get()); + hs_homeobject->set_semaphore(); + // shard is open, so after set semaphore, put blob will block in pre_commit waiting for semaphore to be released. + std::thread blocking_put_blob([this, shard_id] { + auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get(); + ASSERT_FALSE(b); + ASSERT_TRUE(b.error() == BlobError::SEALED_SHARD); + }); + + s = _obj_inst->shard_manager()->seal_shard(shard_id).get(); + ASSERT_TRUE(!!s); + shard_info = s.value(); + EXPECT_EQ(shard_info.id, shard_id); + EXPECT_EQ(shard_info.placement_group, pg_id); + EXPECT_EQ(shard_info.state, ShardInfo::State::SEALED); + LOGINFO("Sealed shard {}", shard_id); + + // now shard is sealed, so put blob should fail immediately + hs_homeobject->release_semaphore(); + blocking_put_blob.join(); +} +#endif \ No newline at end of file