Skip to content

Commit

Permalink
refactor to use HS replication context
Browse files Browse the repository at this point in the history
  • Loading branch information
zichanglai committed Sep 28, 2023
1 parent 9b1679b commit 514efcd
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 306 deletions.
11 changes: 5 additions & 6 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,12 @@ inline shard_id_t make_new_shard_id(pg_id_t pg, shard_id_t next_shard) {

struct Shard {
explicit Shard(ShardInfo info) : info(std::move(info)) {}
virtual ~Shard() = default;
ShardInfo info;
uint16_t chunk_id;
void* metablk_cookie{nullptr};
};

using ShardList = std::list< Shard >;
using ShardIterator = ShardList::iterator;
using ShardPtr = shared< Shard >;
using ShardPtrList = std::list< ShardPtr >;

struct PG {
explicit PG(PGInfo info) : pg_info_(std::move(info)) {}
Expand All @@ -56,7 +55,7 @@ struct PG {

PGInfo pg_info_;
uint64_t shard_sequence_num_{0};
ShardList shards_;
ShardPtrList shards_;
};

class HomeObjectImpl : public HomeObject,
Expand Down Expand Up @@ -92,7 +91,7 @@ class HomeObjectImpl : public HomeObject,
std::map< pg_id_t, shared< PG > > _pg_map;

mutable std::shared_mutex _shard_lock;
std::map< shard_id_t, ShardIterator > _shard_map;
std::map< shard_id_t, ShardPtr > _shard_map;
///

public:
Expand Down
45 changes: 27 additions & 18 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@

namespace homeobject {

const std::string HSHomeObject::s_shard_info_sub_type = "shard_info";

extern std::shared_ptr< HomeObject > init_homeobject(std::weak_ptr< HomeObjectApplication >&& application) {
LOGINFOMOD(homeobject, "Initializing HomeObject");
auto instance = std::make_shared< HSHomeObject >(std::move(application));
Expand Down Expand Up @@ -41,7 +39,6 @@ void HSHomeObject::init_homestore() {
}

chunk_selector_ = std::make_shared< HeapChunkSelector >();

using namespace homestore;
bool need_format = HomeStore::instance()
->with_index_service(nullptr)
Expand All @@ -63,19 +60,17 @@ void HSHomeObject::init_homestore() {
{HS_SERVICE::INDEX, hs_format_params{.size_pct = 5.0}},
});
}
LOGINFO("Initialize and start HomeStore is successfully");
}

void HSHomeObject::register_homestore_metablk_callback() {
// register some callbacks for metadata recovery;
using namespace homestore;
HomeStore::instance()->meta_service().register_handler(
HSHomeObject::s_shard_info_sub_type,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_shard_meta_blk_found(mblk, buf, size);
},
"ShardManager",
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },

[this](bool success) { on_shard_meta_blk_recover_completed(success); },
true);
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);

HomeStore::instance()->meta_service().register_handler(
"PGManager",
Expand All @@ -91,23 +86,37 @@ HSHomeObject::~HSHomeObject() {
iomanager.stop();
}

void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
auto shard = deserialize_shard(r_cast< const char* >(buf.bytes()), size);
shard.metablk_cookie = mblk;
// As shard info in the homestore metablk is always the latest state(OPEN or SEALED),
// we can always create a shard from this shard info and once shard is deleted, the associated metablk will be
// deleted too.
do_commit_new_shard(shard);
}
void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) {
homestore::superblk< shard_info_superblk > sb;
sb.load(buf, mblk);

bool pg_is_recovery = false;
{
std::scoped_lock lock_guard(_pg_lock);
pg_is_recovery = _pg_map.find(sb->placement_group) != _pg_map.end();
}

if (pg_is_recovery) {
auto hs_shard = std::make_shared< HS_Shard >(sb);
add_new_shard_to_map(hs_shard);
return;
}

// There is no guarantee that pg info will be recovery before shard recovery
std::scoped_lock lock_guard(recovery_mutex_);
pending_recovery_shards_[sb->placement_group].push_back(std::move(sb));
}

void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) {
// Find all shard with opening state and excluede their binding chunks from the HeapChunkSelector;
RELEASE_ASSERT(pending_recovery_shards_.empty(), "some shards is still pending on recovery");
std::unordered_set< homestore::chunk_num_t > excluding_chunks;
std::scoped_lock lock_guard(_pg_lock);
for (auto& pair : _pg_map) {
for (auto& shard : pair.second->shards_) {
if (shard.info.state == ShardInfo::State::OPEN) { excluding_chunks.emplace(shard.chunk_id); }
if (shard->info.state == ShardInfo::State::OPEN) {
excluding_chunks.emplace(dp_cast< HS_Shard >(shard)->sb_->chunk_id);
}
}
}

Expand Down
60 changes: 45 additions & 15 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "heap_chunk_selector.h"
#include "lib/homeobject_impl.hpp"
#include "replication_message.hpp"

namespace homestore {
struct meta_blk;
Expand All @@ -17,8 +18,6 @@ struct meta_blk;
namespace homeobject {

class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< HeapChunkSelector > chunk_selector_;
private:
/// Overridable Helpers
ShardManager::Result< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
ShardManager::Result< ShardInfo > _seal_shard(shard_id_t) override;
Expand Down Expand Up @@ -46,6 +45,18 @@ class HSHomeObject : public HomeObjectImpl {
peer_id_t replica_set_uuid;
pg_members members[1]; // ISO C++ forbids zero-size array
};

struct shard_info_superblk {
shard_id_t id;
pg_id_t placement_group;
ShardInfo::State state;
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
uint64_t total_capacity_bytes;
uint64_t deleted_capacity_bytes;
homestore::chunk_num_t chunk_id;
};
#pragma pack()

struct HS_PG : public PG {
Expand All @@ -59,35 +70,54 @@ class HSHomeObject : public HomeObjectImpl {
static PGInfo pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb);
};

struct HS_Shard : public Shard {
homestore::superblk< shard_info_superblk > sb_;
HS_Shard(ShardInfo info, homestore::chunk_num_t chunk_id);
HS_Shard(homestore::superblk< shard_info_superblk > const& sb);
virtual ~HS_Shard() = default;

void update_info(const ShardInfo& info);
void write_sb();
static ShardInfo shard_info_from_sb(homestore::superblk< shard_info_superblk > const& sb);
};

private:
shared< HeapChunkSelector > chunk_selector_;
std::shared_mutex recovery_mutex_;
std::map< pg_id_t, std::list< homestore::superblk< shard_info_superblk > > > pending_recovery_shards_;

private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

void add_pg_to_map(shared< HS_PG > hs_pg);
shard_id_t generate_new_shard_id(pg_id_t pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t);
std::string serialize_shard(const Shard& shard) const;
Shard deserialize_shard(const char* shard_info_str, size_t size) const;
void do_commit_new_shard(const Shard& shard);
void do_commit_seal_shard(const Shard& shard);
void register_homestore_metablk_callback();
void* get_shard_metablk(shard_id_t id) const;

static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
static std::string serialize_shard_info(const ShardInfo& info);
void add_new_shard_to_map(ShardPtr shard);
void update_shard_in_map(const ShardInfo& shard_info);
void do_shard_message_commit(int64_t lsn, const ReplicationMessageHeader& header,
homestore::MultiBlkId const& blkids, sisl::blob value,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
// recover part
static const std::string s_shard_info_sub_type;
void register_homestore_metablk_callback();
void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf, size_t size);
void on_shard_meta_blk_recover_completed(bool success);
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_shard_meta_blk_recover_completed(bool success);

public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject();

void init_homestore();

void on_shard_message_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
homestore::MultiBlkId const& blkids,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_shard_message_commit(int64_t lsn, sisl::blob const& header, homestore::MultiBlkId const& blkids,
homestore::ReplDev* repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx);

ShardManager::Result< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const;

ShardManager::Result< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const;
shared< HeapChunkSelector > chunk_selector() { return chunk_selector_; }
};

} // namespace homeobject
14 changes: 13 additions & 1 deletion src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,17 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
return;
}
add_pg_to_map(std::make_shared< HS_PG >(pg_sb, std::move(v.value())));

// check if any shard recovery is pending by this pg;
std::scoped_lock lock_guard(recovery_mutex_);
auto iter = pending_recovery_shards_.find(pg_sb->id);
if (iter != pending_recovery_shards_.end()) {
for (auto& sb : iter->second) {
auto hs_shard = std::make_shared< HS_Shard >(sb);
add_new_shard_to_map(hs_shard);
}
pending_recovery_shards_.erase(iter);
}
});
}

Expand All @@ -129,6 +140,7 @@ PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superbl
for (uint32_t i{0}; i < sb->num_members; ++i) {
pginfo.members.emplace(sb->members[i].id, std::string(sb->members[i].name), sb->members[i].priority);
}
pginfo.replica_set_uuid = sb->replica_set_uuid;
return pginfo;
}

Expand All @@ -154,4 +166,4 @@ HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk >
shared< homestore::ReplDev > rdev) :
PG{pg_info_from_sb(sb)}, pg_sb_{sb}, repl_dev_{std::move(rdev)} {}

} // namespace homeobject
} // namespace homeobject
Loading

0 comments on commit 514efcd

Please sign in to comment.