Skip to content

Commit

Permalink
Add seal shard api (#81)
Browse files Browse the repository at this point in the history
* Add seal shard api.

Add test to seal shard with recovery.
  • Loading branch information
sanebay authored Oct 18, 2023
1 parent 8735ab4 commit f16ed3f
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 12 deletions.
2 changes: 1 addition & 1 deletion src/include/homeobject/blob_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
namespace homeobject {

ENUM(BlobError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_SHARD, UNKNOWN_BLOB, CHECKSUM_MISMATCH,
READ_FAILED, INDEX_ERROR);
READ_FAILED, INDEX_ERROR, SEALED_SHARD);

struct Blob {
Blob(sisl::io_blob_safe b, std::string const& u, uint64_t o) : body(std::move(b)), user_key(u), object_off(o) {}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::INVALID_ARG);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::SEALED_SHARD);
return _put_blob(e.value(), std::move(blob));
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class HomeObjectImpl : public HomeObject,

/// Implementation defines these
virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0;
virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) = 0;
virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) = 0;

virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0;
virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ using BlobIndexTable = homestore::IndexTable< BlobRouteKey, BlobRouteValue >;
class HSHomeObject : public HomeObjectImpl {
/// Overridable Helpers
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override;
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;

BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
Expand Down
43 changes: 41 additions & 2 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,44 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
});
}

ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id_t id) {
return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
ShardManager::AsyncResult< ShardInfo > HSHomeObject::_seal_shard(ShardInfo const& info) {
auto& pg_id = info.placement_group;
auto& shard_id = info.id;
ShardInfo shard_info = info;

shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev null");
}

shard_info.state = ShardInfo::State::SEALED;
auto seal_shard_message = serialize_shard_info(shard_info);
const auto msg_size = sisl::round_up(seal_shard_message.size(), repl_dev->get_blk_size());
auto req = repl_result_ctx< ShardManager::Result< ShardInfo > >::make(msg_size, 512 /*alignment*/);
auto buf_ptr = req->hdr_buf_.bytes;
std::memset(buf_ptr, 0, msg_size);
std::memcpy(buf_ptr, seal_shard_message.c_str(), seal_shard_message.size());

req->header_.msg_type = ReplicationMessageType::SEAL_SHARD_MSG;
req->header_.pg_id = pg_id;
req->header_.shard_id = shard_id;
req->header_.payload_size = msg_size;
req->header_.payload_crc = crc32_ieee(init_crc32, buf_ptr, msg_size);
req->header_.seal();
sisl::blob header;
header.bytes = r_cast< uint8_t* >(&req->header_);
header.size = sizeof(req->header_);
sisl::sg_list value;
value.size = msg_size;
value.iovs.push_back(iovec(buf_ptr, msg_size));

// replicate this seal shard message to PG members;
repl_dev->async_alloc_write(header, sisl::blob{}, value, req);
return req->result();
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
Expand Down Expand Up @@ -152,6 +188,9 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
}

case ReplicationMessageType::SEAL_SHARD_MSG: {
auto chunk_id = get_shard_chunk(shard_info.id);
RELEASE_ASSERT(chunk_id.has_value(), "Chunk id not found");
chunk_selector()->release_chunk(chunk_id.value());
update_shard_in_map(shard_info);
break;
}
Expand Down
54 changes: 54 additions & 0 deletions src/lib/homestore_backend/tests/test_home_object.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,60 @@ TEST_F(HomeObjectFixture, BasicPutGetBlob) {
}
}

TEST_F(HomeObjectFixture, SealShard) {
// Create a pg, shard, put blob should succeed, seal and put blob again should fail.
// Recover and put blob again should fail.
pg_id_t pg_id{1};
create_pg(pg_id);

auto s = _obj_inst->shard_manager()->create_shard(pg_id, 64 * Mi).get();
ASSERT_TRUE(!!s);
auto shard_info = s.value();
auto shard_id = shard_info.id;
s = _obj_inst->shard_manager()->get_shard(shard_id).get();
ASSERT_TRUE(!!s);

LOGINFO("Got shard {}", shard_id);
shard_info = s.value();
EXPECT_EQ(shard_info.id, shard_id);
EXPECT_EQ(shard_info.placement_group, pg_id);
EXPECT_EQ(shard_info.state, ShardInfo::State::OPEN);
auto b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_TRUE(!!b);
LOGINFO("Put blob {}", b.value());

s = _obj_inst->shard_manager()->seal_shard(shard_id).get();
ASSERT_TRUE(!!s);
shard_info = s.value();
EXPECT_EQ(shard_info.id, shard_id);
EXPECT_EQ(shard_info.placement_group, pg_id);
EXPECT_EQ(shard_info.state, ShardInfo::State::SEALED);
LOGINFO("Sealed shard {}", shard_id);

b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_TRUE(!b);
ASSERT_EQ(b.error(), BlobError::SEALED_SHARD);
LOGINFO("Put blob {}", b.error());

// Restart homeobject
restart();

// Verify shard is sealed.
s = _obj_inst->shard_manager()->get_shard(shard_id).get();
ASSERT_TRUE(!!s);

LOGINFO("After restart shard {}", shard_id);
shard_info = s.value();
EXPECT_EQ(shard_info.id, shard_id);
EXPECT_EQ(shard_info.placement_group, pg_id);
EXPECT_EQ(shard_info.state, ShardInfo::State::SEALED);

b = _obj_inst->blob_manager()->put(shard_id, Blob{sisl::io_blob_safe(512u, 512u), "test_blob", 0ul}).get();
ASSERT_TRUE(!b);
ASSERT_EQ(b.error(), BlobError::SEALED_SHARD);
LOGINFO("Put blob {}", b.error());
}

int main(int argc, char* argv[]) {
int parsed_argc = argc;
::testing::InitGoogleTest(&parsed_argc, argv);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class MemoryHomeObject : public HomeObjectImpl {
/// Helpers
// ShardManager
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override;
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;

// BlobManager
BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/memory_backend/mem_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t p
return info;
}

ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id_t id) {
ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_seal_shard(ShardInfo const& info) {
auto lg = std::scoped_lock(_shard_lock);
auto shard_it = _shard_map.find(id);
auto shard_it = _shard_map.find(info.id);
RELEASE_ASSERT(_shard_map.end() != shard_it, "Missing ShardIterator!");
auto& shard_info = (*shard_it->second)->info;
shard_info.state = ShardInfo::State::SEALED;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id_t pgid)
}

ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id_t id) {
return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::Result< ShardInfo > {
return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::AsyncResult< ShardInfo > {
if (!e) return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return e.value();
return _seal_shard(e.value().id);
return _seal_shard(e.value());
});
}

Expand Down
2 changes: 1 addition & 1 deletion src/lib/tests/BlobManagerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ TEST_F(TestFixture, BasicTests) {
auto p_e =
homeobj_->blob_manager()->put(_shard_1.id, Blob{sisl::io_blob_safe(4 * Ki, 512u), "test_blob", 4 * Mi}).get();
ASSERT_TRUE(!p_e);
EXPECT_EQ(BlobError::INVALID_ARG, p_e.error());
EXPECT_EQ(BlobError::SEALED_SHARD, p_e.error());

EXPECT_TRUE(homeobj_->blob_manager()->del(_shard_1.id, _blob_id).get());
EXPECT_EQ(BlobError::UNKNOWN_BLOB, homeobj_->blob_manager()->get(_shard_1.id, _blob_id).get().error());
Expand Down

0 comments on commit f16ed3f

Please sign in to comment.