Skip to content

Commit

Permalink
address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Oct 6, 2023
1 parent 161cef0 commit 9853736
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 76 deletions.
3 changes: 2 additions & 1 deletion src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

namespace homeobject {

ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_PG, UNKNOWN_SHARD, PG_NOT_READY);
ENUM(ShardError, uint16_t, UNKNOWN = 1, TIMEOUT, INVALID_ARG, NOT_LEADER, UNKNOWN_PG, UNKNOWN_SHARD, PG_NOT_READY,
CRC_MISMATCH);

struct ShardInfo {
enum class State : uint8_t {
Expand Down
8 changes: 4 additions & 4 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t
uint64_t len) const {
return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _get_blob(e.value()->info, blob);
return _get_blob(e.value(), blob);
});
}

BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id_t > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value()->info.state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value()->info, std::move(blob));
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value(), std::move(blob));
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _del_blob(e.value()->info, blob);
return _del_blob(e.value(), blob);
});
}

Expand Down
6 changes: 3 additions & 3 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ struct Shard {
ShardInfo info;
};

using ShardPtr = shared< Shard >;
using ShardPtr = unique< Shard >;
using ShardPtrList = std::list< ShardPtr >;
using ShardIterator = ShardPtrList::iterator;

Expand All @@ -73,14 +73,14 @@ class HomeObjectImpl : public HomeObject,
public std::enable_shared_from_this< HomeObjectImpl > {

/// Implementation defines these
virtual ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0;
virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0;
virtual ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) = 0;

virtual BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0;
virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const = 0;
virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) = 0;
///
folly::Future< ShardManager::Result< ShardPtr > > _get_shard(shard_id_t id) const;
folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id_t id) const;
auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); }

virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) = 0;
Expand Down
5 changes: 2 additions & 3 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte
}

if (pg_is_recovered) {
auto hs_shard = std::make_shared< HS_Shard >(sb);
add_new_shard_to_map(hs_shard);
add_new_shard_to_map(std::make_unique< HS_Shard >(sb));
return;
}

Expand All @@ -115,7 +114,7 @@ void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) {
for (auto& pair : _pg_map) {
for (auto& shard : pair.second->shards_) {
if (shard->info.state == ShardInfo::State::OPEN) {
excluding_chunks.emplace(dp_cast< HS_Shard >(shard)->sb_->chunk_id);
excluding_chunks.emplace(d_cast< HS_Shard* >(shard.get())->sb_->chunk_id);
}
}
}
Expand Down
10 changes: 6 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace homeobject {

class HSHomeObject : public HomeObjectImpl {
/// Overridable Helpers
ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override;

BlobManager::Result< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
Expand Down Expand Up @@ -62,7 +62,7 @@ class HSHomeObject : public HomeObjectImpl {
struct HS_PG : public PG {
homestore::superblk< pg_info_superblk > pg_sb_;
shared< homestore::ReplDev > repl_dev_;

std::optional< homestore::chunk_num_t > any_allocated_chunk_id_{};
HS_PG(PGInfo info, shared< homestore::ReplDev > rdev);
HS_PG(homestore::superblk< pg_info_superblk > const& sb, shared< homestore::ReplDev > rdev);
virtual ~HS_PG() = default;
Expand All @@ -74,7 +74,7 @@ class HSHomeObject : public HomeObjectImpl {
homestore::superblk< shard_info_superblk > sb_;
HS_Shard(ShardInfo info, homestore::chunk_num_t chunk_id);
HS_Shard(homestore::superblk< shard_info_superblk > const& sb);
virtual ~HS_Shard() = default;
~HS_Shard() override = default;

void update_info(const ShardInfo& info);
void write_sb();
Expand All @@ -95,7 +95,7 @@ class HSHomeObject : public HomeObjectImpl {

static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
static std::string serialize_shard_info(const ShardInfo& info);
void add_new_shard_to_map(ShardPtr shard);
void add_new_shard_to_map(ShardPtr&& shard);
void update_shard_in_map(const ShardInfo& shard_info);
void do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids,
sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx);
Expand All @@ -116,6 +116,8 @@ class HSHomeObject : public HomeObjectImpl {

ShardManager::Result< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const;

std::optional< homestore::chunk_num_t > get_any_chunk_id(pg_id_t const pg);

shared< HeapChunkSelector > chunk_selector() { return chunk_selector_; }
};

Expand Down
3 changes: 1 addition & 2 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,7 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
auto iter = pending_recovery_shards_.find(pg_sb->id);
if (iter != pending_recovery_shards_.end()) {
for (auto& sb : iter->second) {
auto hs_shard = std::make_shared< HS_Shard >(sb);
add_new_shard_to_map(hs_shard);
add_new_shard_to_map(std::make_unique< HS_Shard >(sb));
}
pending_recovery_shards_.erase(iter);
}
Expand Down
68 changes: 33 additions & 35 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_
return shard_info;
}

ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) {
ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) {
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
Expand Down Expand Up @@ -96,12 +96,10 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_owner,
value.iovs.push_back(iovec(buf_ptr, msg_size));
// replicate this create shard message to PG members;
repl_dev->async_alloc_write(header, sisl::blob{}, value, req);
auto info = req->result().get();
if (!bool(info)) {
LOGW("create new shard [{}] on pg [{}] is failed with error:{}", new_shard_id & shard_mask, pg_owner,
info.error());
}
return info;
return req->result().deferValue([this](auto const& e) -> ShardManager::Result< ShardInfo > {
if (!e) return folly::makeUnexpected(e.error());
return e.value();
});
}

ShardManager::Result< ShardInfo > HSHomeObject::_seal_shard(shard_id_t id) {
Expand All @@ -123,28 +121,10 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header
// because this raft log had been commit before HO restarts and commit result is already saved in HS metablks
// when HS restarts, all PG/Shard infos will be recovered from HS metablks and if we commit again, it will cause
// duplication;
#if 0
sisl::sg_list value;
value.size = blkids.blk_count() * repl_dev->get_blk_size();
auto value_buf = iomanager.iobuf_alloc(512, value.size);
value.iovs.push_back(iovec{.iov_base = value_buf, .iov_len = value.size});
// header will be released when this function returns, but we still need the header when async_read() finished.
ReplicationMessageHeader msg_header = *r_cast< const ReplicationMessageHeader* >(header.bytes);
repl_dev->async_read(blkids, value, value.size).thenValue([this, lsn, msg_header, blkids, value](auto&& err) {
if (err) {
LOGW(homeobject, "failed to read data from homestore pba, lsn:{}", lsn);
} else {
sisl::blob value_blob(r_cast< uint8_t* >(value.iovs[0].iov_base), value.size);
do_shard_message_commit(lsn, msg_header, blkids, value_blob, nullptr);
}
iomanager.iobuf_free(r_cast< uint8_t* >(value.iovs[0].iov_base));
});
#endif
}

void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header,
homestore::MultiBlkId const& blkids, sisl::blob value,

cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
Expand All @@ -153,22 +133,21 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader

if (header.corrupted()) {
LOGW("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); }
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

if (crc32_ieee(init_crc32, value.bytes, value.size) != header.payload_crc) {
// header & value is inconsistent;
LOGW("replication message header is inconsistent with value, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::INVALID_ARG)); }
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(ShardError::CRC_MISMATCH)); }
return;
}

auto shard_info = deserialize_shard_info(r_cast< const char* >(value.bytes), value.size);
switch (header.msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto hs_shard = std::make_shared< HS_Shard >(shard_info, blkids.chunk_num());
add_new_shard_to_map(hs_shard);
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num()));
break;
}

Expand All @@ -184,38 +163,57 @@ void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }
}

void HSHomeObject::add_new_shard_to_map(ShardPtr shard) {
void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) {
// TODO: We are taking a global lock for all pgs to create shard. Is it really needed??
// We need to have fine grained per PG lock and take only that.
std::scoped_lock lock_guard(_pg_lock, _shard_lock);
auto pg_iter = _pg_map.find(shard->info.placement_group);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
auto& shards = pg_iter->second->shards_;
auto iter = shards.emplace(shards.end(), shard);
auto [_, happened] = _shard_map.emplace(shard->info.id, iter);
auto shard_id = shard->info.id;
auto iter = shards.emplace(shards.end(), std::move(shard));
auto [_, happened] = _shard_map.emplace(shard_id, iter);
RELEASE_ASSERT(happened, "duplicated shard info");

// following part gives follower members a chance to catch up shard sequence num;
auto sequence_num = get_sequence_num_from_shard_id(shard->info.id);
auto sequence_num = get_sequence_num_from_shard_id(shard_id);
if (sequence_num > pg_iter->second->shard_sequence_num_) { pg_iter->second->shard_sequence_num_ = sequence_num; }
}

void HSHomeObject::update_shard_in_map(const ShardInfo& shard_info) {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
auto hs_shard = dp_cast< HS_Shard >(*(shard_iter->second));
auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
hs_shard->update_info(shard_info);
}

ShardManager::Result< homestore::chunk_num_t > HSHomeObject::get_shard_chunk(shard_id_t id) const {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(id);
if (shard_iter == _shard_map.end()) { return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); }
auto hs_shard = dp_cast< HS_Shard >(*(shard_iter->second));
auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
return hs_shard->sb_->chunk_id;
}

std::optional< homestore::chunk_num_t > HSHomeObject::get_any_chunk_id(pg_id_t const pg_id) {
std::scoped_lock lock_guard(_pg_lock);
auto pg_iter = _pg_map.find(pg_id);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
HS_PG* pg = static_cast< HS_PG* >(pg_iter->second.get());
if (pg->any_allocated_chunk_id_.has_value()) {
// it is already cached and use it;
return pg->any_allocated_chunk_id_;
}

auto& shards = pg->shards_;
if (shards.empty()) { return std::optional< homestore::chunk_num_t >(); }
auto hs_shard = d_cast< HS_Shard* >(shards.front().get());
// cache it;
pg->any_allocated_chunk_id_ = hs_shard->sb_->chunk_id;
return pg->any_allocated_chunk_id_;
}

HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t chunk_id) :
Shard(std::move(shard_info)), sb_("ShardManager") {
sb_.create(sizeof(shard_info_superblk));
Expand Down
13 changes: 3 additions & 10 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,12 @@ homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::bl
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.bytes);
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto list_shard_result = home_object_->shard_manager()->list_shards(msg_header->pg_id).get();
if (sisl_unlikely(!list_shard_result)) {
LOGWARNMOD(homeobject, "list shards failed from pg {}", msg_header->pg_id);
break;
}

if (list_shard_result.value().empty()) {
auto any_allocated_chunk_id = home_object_->get_any_chunk_id(msg_header->pg_id);
if (!any_allocated_chunk_id.has_value()) {
// pg is empty without any shards, we leave the decision the HeapChunkSelector to select a pdev
// with most available space and then select one chunk based on that pdev
} else {
auto chunk_id = home_object_->get_shard_chunk(list_shard_result.value().back().id);
RELEASE_ASSERT(!!chunk_id, "unknown shard id to get binded chunk");
return home_object_->chunk_selector()->chunk_to_hints(chunk_id.value());
return home_object_->chunk_selector()->chunk_to_hints(any_allocated_chunk_id.value());
}
break;
}
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/tests/test_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
EXPECT_TRUE(pg_iter != ho->_pg_map.end());
auto& pg_result = pg_iter->second;
EXPECT_EQ(1, pg_result->shards_.size());
auto check_shard = pg_result->shards_.front();
auto check_shard = pg_result->shards_.front().get();
EXPECT_EQ(ShardInfo::State::OPEN, check_shard->info.state);
// release the homeobject and homestore will be shutdown automatically.
_home_object.reset();
Expand All @@ -292,7 +292,7 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
pg_iter = ho->_pg_map.find(_pg_id);
EXPECT_TRUE(pg_iter != ho->_pg_map.end());
EXPECT_EQ(1, pg_iter->second->shards_.size());
auto hs_shard = dp_cast< homeobject::HSHomeObject::HS_Shard >(pg_iter->second->shards_.front());
auto hs_shard = d_cast< homeobject::HSHomeObject::HS_Shard* >(pg_iter->second->shards_.front().get());
EXPECT_TRUE(hs_shard->info == shard_info);
EXPECT_TRUE(hs_shard->sb_->id == shard_info.id);
EXPECT_TRUE(hs_shard->sb_->placement_group == shard_info.placement_group);
Expand Down
2 changes: 1 addition & 1 deletion src/lib/memory_backend/mem_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class MemoryHomeObject : public HomeObjectImpl {

/// Helpers
// ShardManager
ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override;

// BlobManager
Expand Down
5 changes: 2 additions & 3 deletions src/lib/memory_backend/mem_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace homeobject {

uint64_t ShardManager::max_shard_size() { return Gi; }

ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) {
ShardManager::AsyncResult< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_owner, uint64_t size_bytes) {
auto const now = get_current_timestamp();
auto info = ShardInfo(0ull, pg_owner, ShardInfo::State::OPEN, now, now, size_bytes, size_bytes, 0);
{
Expand All @@ -16,8 +16,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_own

auto& s_list = pg_it->second->shards_;
info.id = make_new_shard_id(pg_owner, s_list.size());
auto shard = std::make_shared< Shard >(info);
auto iter = s_list.emplace(s_list.end(), shard);
auto iter = s_list.emplace(s_list.end(), std::make_unique< Shard >(info));
LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id & shard_mask, pg_owner, size_bytes);
auto [_, s_happened] = _shard_map.emplace(info.id, iter);
RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!");
Expand Down
Loading

0 comments on commit 9853736

Please sign in to comment.