Skip to content

Commit

Permalink
fix some review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 12, 2023
1 parent 0af7f2e commit ce58fe9
Show file tree
Hide file tree
Showing 8 changed files with 72 additions and 71 deletions.
8 changes: 4 additions & 4 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,23 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id shard, blob_id con
uint64_t len) const {
return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _get_blob(e.value(), blob);
return _get_blob(e.value().info, blob);
});
}

BlobManager::AsyncResult< blob_id > HomeObjectImpl::put(shard_id shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value(), std::move(blob));
if (ShardInfo::State::SEALED == e.value().info.state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value().info, std::move(blob));
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id shard, blob_id const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _del_blob(e.value(), blob);
return _del_blob(e.value().info, blob);
});
}

Expand Down
20 changes: 7 additions & 13 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,20 @@ namespace homeobject {
inline bool operator<(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id < rhs.id; }
inline bool operator==(ShardInfo const& lhs, ShardInfo const& rhs) { return lhs.id == rhs.id; }

struct ShardInfoExt {
uint16_t chunk_id;
};

struct Shard {
explicit Shard(ShardInfo info) : info(std::move(info)) {}
ShardInfo info;
ShardInfoExt ext_info;
uint16_t chunk_id;
};

using ShardPtr = std::shared_ptr< Shard >;
using ShardPtrList = std::list< ShardPtr >;
using ShardList = std::list< Shard >;
using ShardIterator = ShardList::iterator;

struct PG {
explicit PG(PGInfo info) : pg_info(std::move(info)) {}
PGInfo pg_info;
uint64_t shard_sequence_num{0};
ShardPtrList shards;
ShardList shards;
};

class HomeObjectImpl : public HomeObject,
Expand All @@ -50,8 +46,7 @@ class HomeObjectImpl : public HomeObject,
virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id) const = 0;
virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) = 0;
///

folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id id) const;
folly::Future< ShardManager::Result< Shard > > _get_shard(shard_id id) const;
auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); }

protected:
Expand All @@ -68,8 +63,9 @@ class HomeObjectImpl : public HomeObject,
std::map< pg_id, PG > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map < shard_id, ShardPtr > _shard_map;
std::map < shard_id, ShardIterator > _shard_map;
///
PGManager::Result< PG > _get_pg(pg_id pg);
public:
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application) :
_application(std::move(application)) {}
Expand Down Expand Up @@ -99,8 +95,6 @@ class HomeObjectImpl : public HomeObject,
ShardManager::AsyncResult< InfoList > list_shards(pg_id pg) const final;
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id id) final;
uint64_t get_current_timestamp();

static const std::string s_shard_info_sub_type;
/// BlobManager
BlobManager::AsyncResult< blob_id > put(shard_id shard, Blob&&) final;
BlobManager::AsyncResult< Blob > get(shard_id shard, blob_id const& blob, uint64_t off, uint64_t len) const final;
Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore/homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte
std::string shard_info_str;
shard_info_str.append(r_cast<const char*>(buf.bytes()), size);

auto shard = deserialize_shard_info(shard_info_str);
if (shard->info.state == ShardInfo::State::OPEN) {
auto shard = deserialize_shard(shard_info_str);
if (shard.info.state == ShardInfo::State::OPEN) {
// create shard;
do_commit_new_shard(shard);
} else {
Expand Down
9 changes: 4 additions & 5 deletions src/lib/homestore/homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@ class HSHomeObject : public HomeObjectImpl {
BlobManager::NullResult _del_blob(ShardInfo const&, blob_id) override;
///
private:
bool check_if_pg_exist(pg_id pg) const;
shard_id generate_new_shard_id(pg_id pg);
shard_id make_new_shard_id(pg_id pg, uint64_t sequence_num) const;
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);
std::string serialize_shard_info(ShardPtr shard) const;
ShardPtr deserialize_shard_info(const std::string& shard_info_str) const;
void do_commit_new_shard(ShardPtr shard_info);
void do_commit_seal_shard(ShardPtr shard_info);
std::string serialize_shard(const Shard& shard) const;
Shard deserialize_shard(const std::string& shard_info_str) const;
void do_commit_new_shard(const Shard& shard);
void do_commit_seal_shard(const Shard& shard);
public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject();
Expand Down
66 changes: 30 additions & 36 deletions src/lib/homestore/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,6 @@ namespace homeobject {
uint64_t ShardManager::max_shard_size() { return Gi; }
uint64_t ShardManager::max_shard_num_in_pg() {return ((uint64_t)0x01) << 48;}

bool HSHomeObject::check_if_pg_exist(pg_id pg) const {
std::scoped_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg);
return (iter == _pg_map.end()) ? false : true;
}

shard_id HSHomeObject::generate_new_shard_id(pg_id pg) {
std::scoped_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg);
Expand All @@ -32,21 +26,21 @@ uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) {
return shard_id & (0x0000FFFFFFFFFFFF);
}

std::string HSHomeObject::serialize_shard_info(ShardPtr shard) const {
std::string HSHomeObject::serialize_shard(const Shard& shard) const {
nlohmann::json j;
j["shard_info"]["shard_id"] = shard->info.id;
j["shard_info"]["pg_id"] = shard->info.placement_group;
j["shard_info"]["state"] = shard->info.state;
j["shard_info"]["created_time"] = shard->info.created_time;
j["shard_info"]["modified_time"] = shard->info.last_modified_time;
j["shard_info"]["total_capacity"] = shard->info.total_capacity_bytes;
j["shard_info"]["available_capacity"] = shard->info.available_capacity_bytes;
j["shard_info"]["deleted_capacity"] = shard->info.deleted_capacity_bytes;
j["ext_info"]["chunk_id"] = shard->ext_info.chunk_id;
j["shard_info"]["shard_id"] = shard.info.id;
j["shard_info"]["pg_id"] = shard.info.placement_group;
j["shard_info"]["state"] = shard.info.state;
j["shard_info"]["created_time"] = shard.info.created_time;
j["shard_info"]["modified_time"] = shard.info.last_modified_time;
j["shard_info"]["total_capacity"] = shard.info.total_capacity_bytes;
j["shard_info"]["available_capacity"] = shard.info.available_capacity_bytes;
j["shard_info"]["deleted_capacity"] = shard.info.deleted_capacity_bytes;
j["ext_info"]["chunk_id"] = shard.chunk_id;
return j.dump();
}

ShardPtr HSHomeObject::deserialize_shard_info(const std::string& shard_json_str) const {
Shard HSHomeObject::deserialize_shard(const std::string& shard_json_str) const {
auto shard_json = nlohmann::json::parse(shard_json_str);
ShardInfo shard_info;
shard_info.id = shard_json["shard_info"]["shard_id"].get<shard_id>();
Expand All @@ -57,18 +51,17 @@ ShardPtr HSHomeObject::deserialize_shard_info(const std::string& shard_json_str)
shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get<uint64_t>();
shard_info.total_capacity_bytes = shard_json["shard_info"]["total_capacity"].get<uint64_t>();
shard_info.deleted_capacity_bytes = shard_json["shard_info"]["deleted_capacity"].get<uint64_t>();
auto shard = std::make_shared< Shard >(shard_info);
shard->ext_info.chunk_id = shard_json["ext_info"]["chunk_id"].get<uint16_t>();
auto shard = Shard(shard_info);
shard.chunk_id = shard_json["ext_info"]["chunk_id"].get<uint16_t>();
return shard;
}

ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, uint64_t size_bytes) {
bool pg_exist = check_if_pg_exist(pg_owner);
if (!pg_exist) {
auto pg = _get_pg(pg_owner);
if (!bool(pg)) {
LOGWARN("failed to create shard with non-exist pg [{}]", pg_owner);
return folly::makeUnexpected(ShardError::UNKNOWN_PG);
}

//TODO: will update to ReplDev when ReplDev on HomeStore is ready;
auto replica_set_var = _repl_svc->get_replica_set(fmt::format("{}", pg_owner));
if (std::holds_alternative< home_replication:: ReplServiceError>(replica_set_var)) {
Expand All @@ -84,9 +77,9 @@ ShardManager::Result< ShardInfo > HSHomeObject::_create_shard(pg_id pg_owner, ui

auto new_shard_id = generate_new_shard_id(pg_owner);
auto create_time = get_current_timestamp();
auto shard = std::make_shared<Shard>(ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN,
create_time, create_time, size_bytes, size_bytes, 0));
std::string create_shard_message = serialize_shard_info(shard);
auto shard_info = ShardInfo(new_shard_id, pg_owner, ShardInfo::State::OPEN,
create_time, create_time, size_bytes, size_bytes, 0);
std::string create_shard_message = serialize_shard(Shard(shard_info));
//preapre msg header;
const uint32_t needed_size = sizeof(ReplicationMessageHeader) + create_shard_message.size();
auto buf = nuraft::buffer::alloc(needed_size);
Expand Down Expand Up @@ -161,39 +154,40 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header
homestore::hs()->meta_service().add_sub_sb(HSHomeObject::s_shard_info_sub_type, r_cast<const uint8_t*>(string_value.c_str()), string_value.size(), cookie);

//update in-memory shard map;
auto shard = deserialize_shard_info(string_value);
if (shard->info.state == ShardInfo::State::OPEN) {
auto shard = deserialize_shard(string_value);
if (shard.info.state == ShardInfo::State::OPEN) {
//create shard;
do_commit_new_shard(shard);
} else {
do_commit_seal_shard(shard);
}

if (promise) {
promise->setValue(ShardManager::Result<ShardInfo>(shard->info));
promise->setValue(ShardManager::Result<ShardInfo>(shard.info));
}
}

void HSHomeObject::do_commit_new_shard(ShardPtr shard) {
void HSHomeObject::do_commit_new_shard(const Shard& shard) {
std::scoped_lock lock_guard(_pg_lock, _shard_lock);
auto pg_iter = _pg_map.find(shard->info.placement_group);
auto pg_iter = _pg_map.find(shard.info.placement_group);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
pg_iter->second.shards.push_back(shard);
auto [_, happened] = _shard_map.emplace(shard->info.id, shard);
auto& shards = pg_iter->second.shards;
auto iter = shards.emplace(shards.end(), shard);
auto [_, happened] = _shard_map.emplace(shard.info.id, iter);
RELEASE_ASSERT(happened, "duplicated shard info");

//following part is must for follower members or when member is restarted;
auto sequence_num = get_sequence_num_from_shard_id(shard->info.id);
auto sequence_num = get_sequence_num_from_shard_id(shard.info.id);
if (sequence_num > pg_iter->second.shard_sequence_num) {
pg_iter->second.shard_sequence_num = sequence_num;
}
}

void HSHomeObject::do_commit_seal_shard(ShardPtr shard) {
void HSHomeObject::do_commit_seal_shard(const Shard& shard) {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard->info.id);
auto shard_iter = _shard_map.find(shard.info.id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
*(shard_iter->second) = *shard;
*(shard_iter->second) = shard;
}

} // namespace homeobject
10 changes: 5 additions & 5 deletions src/lib/memory/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id pg_owner

auto& s_list = pg_it->second.shards;
info.id = (((uint64_t)pg_owner) << 48) + s_list.size();
auto shard = std::make_shared<Shard>(info);
auto iter = s_list.emplace(s_list.end(), Shard(info));
LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id, pg_owner, size_bytes);
s_list.push_back(shard);
auto [_, s_happened] = _shard_map.emplace(info.id, shard);
auto [_, s_happened] = _shard_map.emplace(info.id, iter);
RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!");
}
auto lg = std::scoped_lock(_index_lock);
Expand All @@ -32,8 +31,9 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id id) {
auto lg = std::scoped_lock(_shard_lock);
auto shard_it = _shard_map.find(id);
if (_shard_map.end() == shard_it) return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
shard_it->second->info.state = ShardInfo::State::SEALED;
return shard_it->second->info;
auto& shard_info = (*shard_it->second).info;
shard_info.state = ShardInfo::State::SEALED;
return shard_info;
}

} // namespace homeobject
9 changes: 9 additions & 0 deletions src/lib/pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,13 @@ PGManager::NullAsyncResult HomeObjectImpl::replace_member(pg_id id, peer_id cons
});
}

PGManager::Result< PG > HomeObjectImpl::_get_pg(pg_id pg) {
std::scoped_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg);
if (iter == _pg_map.end()) {
return folly::makeUnexpected(PGError::UNKNOWN_PG);
}
return iter->second;
}

} // namespace homeobject
17 changes: 11 additions & 6 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id pg) cons

auto info_l = std::list< ShardInfo >();
for (auto const& shard : pg_it->second.shards) {
LOGDEBUG("Listing Shard {}", shard->info.id);
info_l.push_back(shard->info);
LOGDEBUG("Listing Shard {}", shard.info.id);
info_l.push_back(shard.info);
}
return info_l;
});
Expand All @@ -36,15 +36,20 @@ ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id id) {
}


ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) const { return _get_shard(id); }
ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id id) const {
return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::Result< ShardInfo > {
if (!e) { return folly::makeUnexpected(e.error());}
return e.value().info;
});
}

///
// This is used as a first call for many operations and initializes the Future.
//
folly::Future< ShardManager::Result< ShardInfo > > HomeObjectImpl::_get_shard(shard_id id) const {
return _defer().thenValue([this, id](auto) -> ShardManager::Result< ShardInfo > {
folly::Future< ShardManager::Result< Shard > > HomeObjectImpl::_get_shard(shard_id id) const {
return _defer().thenValue([this, id](auto) -> ShardManager::Result< Shard > {
auto lg = std::shared_lock(_shard_lock);
if (auto it = _shard_map.find(id); _shard_map.end() != it) return it->second->info;
if (auto it = _shard_map.find(id); _shard_map.end() != it) return *(it->second);
return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
});
}
Expand Down

0 comments on commit ce58fe9

Please sign in to comment.