Skip to content

Commit

Permalink
rework recovery (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
JacksonYao287 authored Mar 14, 2024
1 parent a0df650 commit 34f7a1a
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 124 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "1.0.10"
version = "1.0.11"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
49 changes: 17 additions & 32 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,24 @@ void HSHomeObject::init_homestore() {
auto const new_id = app->discover_svcid(_our_id);
RELEASE_ASSERT(new_id == _our_id, "Received new SvcId [{}] AFTER recovery of [{}]?!", to_string(new_id),
to_string(_our_id));
recover_pg();
recover_shard();
}
initialize_chunk_selector();

// recover PG
HomeStore::instance()->meta_service().register_handler(
_pg_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);
HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name);

// recover shard
HomeStore::instance()->meta_service().register_handler(
_shard_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_shard_meta_name);

recovery_done_ = true;
LOGI("Initialize and start HomeStore is successfully");

Expand Down Expand Up @@ -205,21 +219,6 @@ void HSHomeObject::register_homestore_metablk_callback() {
LOGI("Found existing SvcId: [{}]", to_string(_our_id));
},
nullptr, true);

HomeStore::instance()->meta_service().register_handler(
_shard_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
m_shard_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk)));
},
nullptr, true);

HomeStore::instance()->meta_service().register_handler(
_pg_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
m_pg_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk)));
},
// TODO: move "repl_dev" to homestore::repl_dev and "index" to homestore::index
nullptr, true);
}

HSHomeObject::~HSHomeObject() {
Expand All @@ -235,20 +234,6 @@ HSHomeObject::~HSHomeObject() {
iomanager.stop();
}

void HSHomeObject::initialize_chunk_selector() {
std::unordered_set< homestore::chunk_num_t > excluding_chunks;
std::scoped_lock lock_guard(_pg_lock);
for (auto& pair : _pg_map) {
for (auto& shard : pair.second->shards_) {
if (shard->info.state == ShardInfo::State::OPEN) {
excluding_chunks.emplace(d_cast< HS_Shard* >(shard.get())->sb_->chunk_id);
}
}
}

chunk_selector_->build_per_dev_chunk_heap(excluding_chunks);
}

HomeObjectStats HSHomeObject::_get_stats() const {
HomeObjectStats stats;
auto const& repl_svc = homestore::hs()->repl_service();
Expand Down
8 changes: 3 additions & 5 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< BlobIndexTable > index_table;
};
std::unordered_map< std::string, PgIndexTable > index_table_pg_map_;
std::vector< std::pair< sisl::byte_view, void* > > m_pg_sb_bufs;
std::vector< std::pair< sisl::byte_view, void* > > m_shard_sb_bufs;

public:
#pragma pack(1)
Expand Down Expand Up @@ -248,9 +246,9 @@ class HSHomeObject : public HomeObjectImpl {

// recover part
void register_homestore_metablk_callback();
void initialize_chunk_selector();
void recover_pg();
void recover_shard();
void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_shard_meta_blk_recover_completed(bool success);

void persist_pg_sb();

Expand Down
45 changes: 21 additions & 24 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,31 +192,28 @@ PGInfo HSHomeObject::deserialize_pg_info(const unsigned char* json_str, size_t s
return pg_info;
}

void HSHomeObject::recover_pg() {
for (auto const& [buf, mblk] : m_pg_sb_bufs) {
homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name);
pg_sb.load(buf, mblk);

auto v = hs_repl_service().get_repl_dev(pg_sb->replica_set_uuid);
if (v.hasError()) {
// TODO: We need to raise an alert here, since without pg repl_dev all operations on that pg will fail
LOGE("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid));
return;
}
auto pg_id = pg_sb->id;
auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid);
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_sb), std::move(v.value()));
// During PG recovery check if index is already recoverd else
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;

add_pg_to_map(std::move(hs_pg));
void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name);
pg_sb.load(buf, meta_cookie);

auto v = hs_repl_service().get_repl_dev(pg_sb->replica_set_uuid);
if (v.hasError()) {
// TODO: We need to raise an alert here, since without pg repl_dev all operations on that pg will fail
LOGE("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid));
return;
}
m_pg_sb_bufs.clear();
auto pg_id = pg_sb->id;
auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid);
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_sb), std::move(v.value()));
// During PG recovery check if index is already recoverd else
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;

add_pg_to_map(std::move(hs_pg));
}

PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb) {
Expand Down
22 changes: 16 additions & 6 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,23 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
}
}

void HSHomeObject::recover_shard() {
for (auto& [buf, mblk] : m_shard_sb_bufs) {
homestore::superblk< shard_info_superblk > sb(_shard_meta_name);
sb.load(buf, mblk);
add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(sb)));
void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) {
homestore::superblk< shard_info_superblk > sb(_shard_meta_name);
sb.load(buf, mblk);
add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(sb)));
}

void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) {
std::unordered_set< homestore::chunk_num_t > excluding_chunks;
std::scoped_lock lock_guard(_pg_lock);
for (auto& pair : _pg_map) {
for (auto& shard : pair.second->shards_) {
if (shard->info.state == ShardInfo::State::OPEN) {
excluding_chunks.emplace(d_cast< HS_Shard* >(shard.get())->sb_->chunk_id);
}
}
}
m_shard_sb_bufs.clear();
chunk_selector_->build_per_dev_chunk_heap(excluding_chunks);
}

void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) {
Expand Down
78 changes: 22 additions & 56 deletions src/lib/homestore_backend/tests/hs_shard_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,41 +125,16 @@ TEST_F(TestFixture, MockSealShard) {
}
#endif

class FixtureAppWithRecovery : public FixtureApp {
std::string fpath_{"/tmp/test_shard_manager.data." + std::to_string(rand())};

public:
std::list< std::filesystem::path > devices() const override {
auto device_info = std::list< std::filesystem::path >();
device_info.emplace_back(std::filesystem::canonical(fpath_));
return device_info;
}

std::string path() const { return fpath_; }
};

class ShardManagerTestingRecovery : public ::testing::Test {
public:
void SetUp() override { app = std::make_shared< FixtureAppWithRecovery >(); }

void SetUp() override { app = std::make_shared< FixtureApp >(); }
void TearDown() override { app->clean(); }

protected:
std::shared_ptr< FixtureApp > app;
};

// TODO: enable the following test case after we fix raft repl dev recovery issue.
/*
TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
// prepare the env first;
auto app_with_recovery = dp_cast< FixtureAppWithRecovery >(app);
const std::string fpath = app_with_recovery->path();
if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath); }
LOGI("creating device files with size {} ", homestore::in_bytes(2 * Gi));
LOGI("creating {} device file", fpath);
std::ofstream ofs{fpath, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(fpath, 2 * Gi);
homeobject::pg_id_t _pg_id{1u};
homeobject::peer_id_t _peer1;
homeobject::peer_id_t _peer2;
Expand Down Expand Up @@ -187,6 +162,7 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
_home_object.reset();
LOGI("restart home_object");
_home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app));
std::this_thread::sleep_for(std::chrono::seconds{5});
homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get());
// check PG after recovery.
EXPECT_TRUE(ho->_pg_map.size() == 1);
Expand All @@ -201,15 +177,15 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
EXPECT_EQ(ShardInfo::State::OPEN, check_shard->info.state);

auto hs_shard = d_cast< homeobject::HSHomeObject::HS_Shard* >(check_shard);
EXPECT_TRUE(hs_shard->info == shard_info);
EXPECT_TRUE(hs_shard->sb_->id == shard_info.id);
EXPECT_TRUE(hs_shard->sb_->placement_group == shard_info.placement_group);
EXPECT_TRUE(hs_shard->sb_->state == shard_info.state);
EXPECT_TRUE(hs_shard->sb_->created_time == shard_info.created_time);
EXPECT_TRUE(hs_shard->sb_->last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(hs_shard->sb_->available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->deleted_capacity_bytes == shard_info.deleted_capacity_bytes);
auto& recovered_shard_info = hs_shard->info;
EXPECT_TRUE(recovered_shard_info == shard_info);
EXPECT_TRUE(recovered_shard_info.placement_group == shard_info.placement_group);
EXPECT_TRUE(recovered_shard_info.state == shard_info.state);
EXPECT_TRUE(recovered_shard_info.created_time == shard_info.created_time);
EXPECT_TRUE(recovered_shard_info.last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(recovered_shard_info.available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.deleted_capacity_bytes == shard_info.deleted_capacity_bytes);

// seal the shard when shard is recovery
e = _home_object->shard_manager()->seal_shard(shard_id).get();
Expand All @@ -221,6 +197,7 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
LOGI("restart home_object again");
// re-create the homeobject and pg infos and shard infos will be recover automatically.
_home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app));
std::this_thread::sleep_for(std::chrono::seconds{5});
auto s = _home_object->shard_manager()->get_shard(shard_id).get();
ASSERT_TRUE(!!s);
EXPECT_EQ(ShardInfo::State::SEALED, s.value().state);
Expand All @@ -237,19 +214,9 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
EXPECT_EQ(2, pg_iter->second->shard_sequence_num_);
// finally close the homeobject and homestore.
_home_object.reset();
std::filesystem::remove(fpath);
}

TEST_F(ShardManagerTestingRecovery, SealedShardRecovery) {
// prepare the env first;
auto app_with_recovery = dp_cast< FixtureAppWithRecovery >(app);
const std::string fpath = app_with_recovery->path();
if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath); }
LOGI("creating device files with size {} ", homestore::in_bytes(2 * Gi));
LOGI("creating {} device file", fpath);
std::ofstream ofs{fpath, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(fpath, 2 * Gi);
homeobject::pg_id_t _pg_id{1u};
homeobject::peer_id_t _peer1;
homeobject::peer_id_t _peer2;
Expand Down Expand Up @@ -285,24 +252,23 @@ TEST_F(ShardManagerTestingRecovery, SealedShardRecovery) {
LOGI("restart home_object");
// re-create the homeobject and pg infos and shard infos will be recover automatically.
_home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app));
std::this_thread::sleep_for(std::chrono::seconds{5});
ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get());
EXPECT_TRUE(ho->_pg_map.size() == 1);
// check shard internal state;
pg_iter = ho->_pg_map.find(_pg_id);
EXPECT_TRUE(pg_iter != ho->_pg_map.end());
EXPECT_EQ(1, pg_iter->second->shards_.size());
auto hs_shard = d_cast< homeobject::HSHomeObject::HS_Shard* >(pg_iter->second->shards_.front().get());
EXPECT_TRUE(hs_shard->info == shard_info);
EXPECT_TRUE(hs_shard->sb_->id == shard_info.id);
EXPECT_TRUE(hs_shard->sb_->placement_group == shard_info.placement_group);
EXPECT_TRUE(hs_shard->sb_->state == shard_info.state);
EXPECT_TRUE(hs_shard->sb_->created_time == shard_info.created_time);
EXPECT_TRUE(hs_shard->sb_->last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(hs_shard->sb_->available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->deleted_capacity_bytes == shard_info.deleted_capacity_bytes);
auto& recovered_shard_info = hs_shard->info;
EXPECT_TRUE(recovered_shard_info == shard_info);
EXPECT_TRUE(recovered_shard_info.placement_group == shard_info.placement_group);
EXPECT_TRUE(recovered_shard_info.state == shard_info.state);
EXPECT_TRUE(recovered_shard_info.created_time == shard_info.created_time);
EXPECT_TRUE(recovered_shard_info.last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(recovered_shard_info.available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.deleted_capacity_bytes == shard_info.deleted_capacity_bytes);
// finally close the homeobject and homestore.
_home_object.reset();
std::filesystem::remove(fpath);
}
*/

0 comments on commit 34f7a1a

Please sign in to comment.