Skip to content

Commit

Permalink
add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 committed Dec 3, 2023
1 parent 8457aaa commit 67d9c61
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,14 +134,26 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

if (ctx) { RELEASE_ASSERT(!ctx->promise_.isFulfilled(), "on_blob_put_pre_commit promise is already fulfilled"); }

auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
auto shard_id = msg_header->shard_id;

// used for test
if (smphSignal) smphSignal->acquire();

{
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard_id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
if ((shard_iter->second->get()->info).state == ShardInfo::State::SEALED) {
LOGE("Shard {} is sealed when pre_commit put_blob", shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); }
// we return false here, so on_blob_put_commit will not be called.
// instead, on_blob_put_rollback will be called.
return false;
Expand All @@ -152,14 +164,11 @@ bool HSHomeObject::on_blob_put_pre_commit(int64_t lsn, sisl::blob const& header,

void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}
if (!hs_ctx) return;
auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); }
// TODO: stale blks will be left, GC will take care of it.
return;
}

Expand All @@ -173,8 +182,6 @@ void HSHomeObject::on_blob_put_rollback(int64_t lsn, sisl::blob const& header, s
}
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
repl_dev->async_free_blks(lsn, hs_ctx->get_local_blkid());

if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::SEALED_SHARD)); }
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
Expand Down
6 changes: 6 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ void HSHomeObject::init_timer_thread() {

void HSHomeObject::trigger_timed_events() { persist_pg_sb(); }

void HSHomeObject::set_semaphore() { smphSignal = std::make_shared< std::binary_semaphore >(0); }

void HSHomeObject::release_semaphore() {
if (smphSignal) smphSignal->release();
}

void HSHomeObject::register_homestore_metablk_callback() {
// register some callbacks for metadata recovery;
using namespace homestore;
Expand Down
7 changes: 7 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <memory>
#include <mutex>
#include <semaphore>

#include <homestore/homestore.hpp>
#include <homestore/index/index_table.hpp>
Expand Down Expand Up @@ -160,6 +161,9 @@ class HSHomeObject : public HomeObjectImpl {
shared< HeapChunkSelector > chunk_selector_;
iomgr::timer_handle_t ho_timer_thread_handle_;

// used for testing
shared< std::binary_semaphore > smphSignal;

private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

Expand Down Expand Up @@ -238,6 +242,9 @@ class HSHomeObject : public HomeObjectImpl {
void print_btree_index(pg_id_t pg_id);

void trigger_timed_events();

void set_semaphore();
void release_semaphore();
};

class BlobIndexServiceCallbacks : public homestore::IndexServiceCallbacks {
Expand Down
43 changes: 43 additions & 0 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,3 +285,46 @@ TEST_F(HomeObjectFixture, SealShardWithRestart) {
ASSERT_EQ(b.error(), BlobError::SEALED_SHARD);
LOGINFO("Put blob {}", b.error());
}

TEST_F(HomeObjectFixture, SealShardWithPutBlob) {
pg_id_t pg_id{1};
create_pg(pg_id);

auto s = _obj_inst->shard_manager()->create_shard(pg_id, 64 * Mi).get();
ASSERT_TRUE(!!s);
auto shard_info = s.value();
auto shard_id = shard_info.id;
s = _obj_inst->shard_manager()->get_shard(shard_id).get();
ASSERT_TRUE(!!s);

// normal put blob should succeed
LOGINFO("Got shard {}", shard_id);
shard_info = s.value();
EXPECT_EQ(shard_info.id, shard_id);
EXPECT_EQ(shard_info.placement_group, pg_id);
EXPECT_EQ(shard_info.state, ShardInfo::State::OPEN);
auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_TRUE(!!b);
LOGINFO("Put blob {}", b.value());

auto hs_homeobject = dynamic_cast< HSHomeObject* >(_obj_inst.get());
hs_homeobject->set_semaphore();
// shard is open, so after set semaphore, put blob will block in pre_commit for semaphore to be released.
std::thread blocking_put_blob([this, shard_id] {
auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_FALSE(b);
ASSERT_TRUE(b.error() == BlobError::SEALED_SHARD);
});

s = _obj_inst->shard_manager()->seal_shard(shard_id).get();
ASSERT_TRUE(!!s);
shard_info = s.value();
EXPECT_EQ(shard_info.id, shard_id);
EXPECT_EQ(shard_info.placement_group, pg_id);
EXPECT_EQ(shard_info.state, ShardInfo::State::SEALED);
LOGINFO("Sealed shard {}", shard_id);

// now shard is sealed, so put blob should fail immediately
hs_homeobject->release_semaphore();
blocking_put_blob.join();
}

0 comments on commit 67d9c61

Please sign in to comment.