Skip to content

Commit

Permalink
fix review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 29, 2023
1 parent 61f83e3 commit 29bbe93
Show file tree
Hide file tree
Showing 9 changed files with 50 additions and 35 deletions.
8 changes: 4 additions & 4 deletions src/lib/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t
uint64_t len) const {
return _get_shard(shard).thenValue([this, blob](auto const e) -> BlobManager::Result< Blob > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _get_blob(e.value().info, blob);
return _get_blob(e.value()->info, blob);
});
}

BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) {
return _get_shard(shard).thenValue(
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::Result< blob_id_t > {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().info.state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value().info, std::move(blob));
if (ShardInfo::State::SEALED == e.value()->info.state) return folly::makeUnexpected(BlobError::INVALID_ARG);
return _put_blob(e.value()->info, std::move(blob));
});
}

BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) {
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullResult {
if (!e) return folly::makeUnexpected(BlobError::UNKNOWN_SHARD);
return _del_blob(e.value().info, blob);
return _del_blob(e.value()->info, blob);
});
}

Expand Down
5 changes: 3 additions & 2 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ struct Shard {

using ShardPtr = shared< Shard >;
using ShardPtrList = std::list< ShardPtr >;
using ShardIterator = ShardPtrList::iterator;

struct PG {
explicit PG(PGInfo info) : pg_info_(std::move(info)) {}
Expand Down Expand Up @@ -72,7 +73,7 @@ class HomeObjectImpl : public HomeObject,
virtual BlobManager::Result< Blob > _get_blob(ShardInfo const&, blob_id_t) const = 0;
virtual BlobManager::NullResult _del_blob(ShardInfo const&, blob_id_t) = 0;
///
folly::Future< ShardManager::Result< Shard > > _get_shard(shard_id_t id) const;
folly::Future< ShardManager::Result< ShardPtr > > _get_shard(shard_id_t id) const;
auto _defer() const { return folly::makeSemiFuture().via(folly::getGlobalCPUExecutor()); }

virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< std::string, std::less<> > peers) = 0;
Expand All @@ -91,7 +92,7 @@ class HomeObjectImpl : public HomeObject,
std::map< pg_id_t, shared< PG > > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id_t, ShardPtr > _shard_map;
std::map< shard_id_t, ShardIterator > _shard_map;
///

public:
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,19 +90,19 @@ void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte
homestore::superblk< shard_info_superblk > sb;
sb.load(buf, mblk);

bool pg_is_recovery = false;
bool pg_is_recovered = false;
{
std::scoped_lock lock_guard(_pg_lock);
pg_is_recovery = _pg_map.find(sb->placement_group) != _pg_map.end();
pg_is_recovered = _pg_map.find(sb->placement_group) != _pg_map.end();
}

if (pg_is_recovery) {
if (pg_is_recovered) {
auto hs_shard = std::make_shared< HS_Shard >(sb);
add_new_shard_to_map(hs_shard);
return;
}

// There is no guarantee that pg info will be recovery before shard recovery
// There is no guarantee that pg info will be recovered before shard recovery
std::scoped_lock lock_guard(recovery_mutex_);
pending_recovery_shards_[sb->placement_group].push_back(std::move(sb));
}
Expand Down
5 changes: 2 additions & 3 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ class HSHomeObject : public HomeObjectImpl {
static std::string serialize_shard_info(const ShardInfo& info);
void add_new_shard_to_map(ShardPtr shard);
void update_shard_in_map(const ShardInfo& shard_info);
void do_shard_message_commit(int64_t lsn, const ReplicationMessageHeader& header,
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header, homestore::MultiBlkId const& blkids,
sisl::blob value, cintrusive< homestore::repl_req_ctx >& hs_ctx);
// recover part
void register_homestore_metablk_callback();
void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
Expand Down
12 changes: 6 additions & 6 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header
auto value_buf = iomanager.iobuf_alloc(512, value.size);
value.iovs.push_back(iovec{.iov_base = value_buf, .iov_len = value.size});
// header will be released when this function returns, but we still need the header when async_read() finished.
const ReplicationMessageHeader msg_header = *r_cast< const ReplicationMessageHeader* >(header.bytes);
ReplicationMessageHeader msg_header = *r_cast< const ReplicationMessageHeader* >(header.bytes);
repl_dev->async_read(blkids, value, value.size).thenValue([this, lsn, msg_header, blkids, value](auto&& err) {
if (err) {
LOGWARNMOD(homeobject, "failed to read data from homestore pba, lsn:{}", lsn);
Expand All @@ -142,7 +142,7 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& header
#endif
}

void HSHomeObject::do_shard_message_commit(int64_t lsn, const ReplicationMessageHeader& header,
void HSHomeObject::do_shard_message_commit(int64_t lsn, ReplicationMessageHeader& header,
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< ShardManager::Result< ShardInfo > >* ctx{nullptr};
Expand Down Expand Up @@ -190,8 +190,8 @@ void HSHomeObject::add_new_shard_to_map(ShardPtr shard) {
auto pg_iter = _pg_map.find(shard->info.placement_group);
RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info");
auto& shards = pg_iter->second->shards_;
shards.emplace_back(shard);
auto [_, happened] = _shard_map.emplace(shard->info.id, shard);
auto iter = shards.emplace(shards.end(), shard);
auto [_, happened] = _shard_map.emplace(shard->info.id, iter);
RELEASE_ASSERT(happened, "duplicated shard info");

// following part gives follower members a chance to catch up shard sequence num;
Expand All @@ -203,15 +203,15 @@ void HSHomeObject::update_shard_in_map(const ShardInfo& shard_info) {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(shard_info.id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Missing shard info");
auto hs_shard = dp_cast< HS_Shard >(shard_iter->second);
auto hs_shard = dp_cast< HS_Shard >(*(shard_iter->second));
hs_shard->update_info(shard_info);
}

ShardManager::Result< homestore::chunk_num_t > HSHomeObject::get_shard_chunk(shard_id_t id) const {
std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(id);
if (shard_iter == _shard_map.end()) { return folly::makeUnexpected(ShardError::UNKNOWN_SHARD); }
auto hs_shard = dp_cast< HS_Shard >(shard_iter->second);
auto hs_shard = dp_cast< HS_Shard >(*(shard_iter->second));
return hs_shard->sb_->chunk_id;
}

Expand Down
21 changes: 16 additions & 5 deletions src/lib/homestore_backend/replication_message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,26 @@ struct ReplicationMessageHeader {
uint32_t payload_crc;
uint8_t reserved_pad[4]{};
uint32_t header_crc;
void seal() { header_crc = calculate_crc(); }
void seal() {
header_crc = 0;
header_crc = calculate_crc();
}

bool corrupted() const {
return magic_num != HOMEOBJECT_REPLICATION_MAGIC ||
protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1 || header_crc != calculate_crc();
bool corrupted() {
if (magic_num != HOMEOBJECT_REPLICATION_MAGIC ||
protocol_version != HOMEOBJECT_REPLICATION_PROTOCOL_VERSION_V1) {
return true;
}

auto saved_crc = header_crc;
header_crc = 0;
bool corrupted = (saved_crc != calculate_crc());
header_crc = saved_crc;
return corrupted;
}

uint32_t calculate_crc() const {
return crc32_ieee(init_crc32, r_cast< const unsigned char* >(this), sizeof(*this) - sizeof(header_crc));
return crc32_ieee(init_crc32, r_cast< const unsigned char* >(this), sizeof(*this));
}
};

Expand Down
10 changes: 7 additions & 3 deletions src/lib/homestore_backend/tests/test_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,16 @@ TEST_F(ShardManagerTesting, MockSealShard) {
}

class FixtureAppWithRecovery : public FixtureApp {
std::string fpath_{"/tmp/test_shard_manager.data.{}" + std::to_string(rand())};

public:
std::list< std::filesystem::path > devices() const override {
const std::string fpath{"/tmp/test_homestore.data"};
auto device_info = std::list< std::filesystem::path >();
device_info.emplace_back(std::filesystem::canonical(fpath));
device_info.emplace_back(std::filesystem::canonical(fpath_));
return device_info;
}

std::string path() const { return fpath_; }
};

class ShardManagerTestingRecovery : public ::testing::Test {
Expand All @@ -238,7 +241,8 @@ class ShardManagerTestingRecovery : public ::testing::Test {

TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
// prepare the env first;
const std::string fpath{"/tmp/test_homestore.data"};
auto app_with_recovery = dp_cast< FixtureAppWithRecovery >(app);
const std::string fpath = app_with_recovery->path();
if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath); }
LOGINFO("creating device files with size {} ", homestore::in_bytes(2 * Gi));
LOGINFO("creating {} device file", fpath);
Expand Down
6 changes: 3 additions & 3 deletions src/lib/memory_backend/mem_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_create_shard(pg_id_t pg_own
auto& s_list = pg_it->second->shards_;
info.id = make_new_shard_id(pg_owner, s_list.size());
auto shard = std::make_shared< Shard >(info);
s_list.emplace(s_list.end(), shard);
auto iter = s_list.emplace(s_list.end(), shard);
LOGDEBUG("Creating Shard [{}]: in Pg [{}] of Size [{}b]", info.id & shard_mask, pg_owner, size_bytes);
auto [_, s_happened] = _shard_map.emplace(info.id, shard);
auto [_, s_happened] = _shard_map.emplace(info.id, iter);
RELEASE_ASSERT(s_happened, "Duplicate Shard insertion!");
}
auto [it, happened] = index_.try_emplace(info.id, std::make_unique< ShardIndex >());
Expand All @@ -31,7 +31,7 @@ ShardManager::Result< ShardInfo > MemoryHomeObject::_seal_shard(shard_id_t id) {
auto lg = std::scoped_lock(_shard_lock);
auto shard_it = _shard_map.find(id);
RELEASE_ASSERT(_shard_map.end() != shard_it, "Missing ShardIterator!");
auto& shard_info = shard_it->second->info;
auto& shard_info = (*shard_it->second)->info;
shard_info.state = ShardInfo::State::SEALED;
return shard_info;
}
Expand Down
10 changes: 5 additions & 5 deletions src/lib/shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,23 +31,23 @@ ShardManager::AsyncResult< InfoList > HomeObjectImpl::list_shards(pg_id_t pgid)
ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::seal_shard(shard_id_t id) {
return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::Result< ShardInfo > {
if (!e) return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
if (ShardInfo::State::SEALED == e.value().info.state) return e.value().info;
return _seal_shard(e.value().info.id);
if (ShardInfo::State::SEALED == e.value()->info.state) return e.value()->info;
return _seal_shard(e.value()->info.id);
});
}

ShardManager::AsyncResult< ShardInfo > HomeObjectImpl::get_shard(shard_id_t id) const {
return _get_shard(id).thenValue([this](auto const e) mutable -> ShardManager::Result< ShardInfo > {
if (!e) { return folly::makeUnexpected(e.error()); }
return e.value().info;
return e.value()->info;
});
}

///
// This is used as a first call for many operations and initializes the Future.
//
folly::Future< ShardManager::Result< Shard > > HomeObjectImpl::_get_shard(shard_id_t id) const {
return _defer().thenValue([this, id](auto) -> ShardManager::Result< Shard > {
folly::Future< ShardManager::Result< ShardPtr > > HomeObjectImpl::_get_shard(shard_id_t id) const {
return _defer().thenValue([this, id](auto) -> ShardManager::Result< ShardPtr > {
auto lg = std::shared_lock(_shard_lock);
if (auto it = _shard_map.find(id); _shard_map.end() != it) return *(it->second);
return folly::makeUnexpected(ShardError::UNKNOWN_SHARD);
Expand Down

0 comments on commit 29bbe93

Please sign in to comment.