Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support spare replicas in raft test framework #226

Merged
merged 5 commits into from
Nov 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.7"
version = "2.1.8"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ struct PGInfo {
pg_id_t id;
mutable MemberSet members;
peer_id_t replica_set_uuid;
u_int64_t size;
uint64_t size;

auto operator<=>(PGInfo const& rhs) const { return id <=> rhs.id; }
auto operator==(PGInfo const& rhs) const { return id == rhs.id; }
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,15 @@ target_link_libraries(homestore_test PUBLIC
)
add_test(NAME HomestoreTest COMMAND homestore_test -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)
set_property(TEST HomestoreTest PROPERTY RUN_SERIAL 1)

add_executable (homestore_test_dynamic)
target_sources(homestore_test_dynamic PRIVATE
$<TARGET_OBJECTS:homestore_tests_dynamic>
)
target_link_libraries(homestore_test_dynamic PUBLIC
homeobject_homestore
${COMMON_TEST_DEPS}
)

add_test(NAME HomestoreTestDynamic COMMAND homestore_test_dynamic -csv error --executor immediate --config_path ./ --override_config homestore_config.consensus.snapshot_freq_distance:0)

5 changes: 5 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,4 +323,9 @@ sisl::io_blob_safe& HSHomeObject::get_pad_buf(uint32_t pad_len) {
return zpad_bufs_[idx];
}

bool HSHomeObject::pg_exists(pg_id_t pg_id) const {
std::shared_lock lock_guard(_pg_lock);
return _pg_map.contains(pg_id);
}

} // namespace homeobject
26 changes: 18 additions & 8 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ class HSHomeObject : public HomeObjectImpl {
// | pg_members[0] | pg_members[1] | ... | pg_members[num_members-1] |
// Immediately followed by an array of 'chunk_num_t' values (representing r_chunk_ids):
// | chunk_num_t[0] | chunk_num_t[1] | ... | chunk_num_t[num_chunks-1] |
// Here, 'chunk_num_t[i]' represents the r_chunk_id for the v_chunk_id 'i', where v_chunk_id starts from 0 and increases sequentially.
// Here, 'chunk_num_t[i]' represents the r_chunk_id for the v_chunk_id 'i', where v_chunk_id starts from 0 and
// increases sequentially.


uint32_t size() const { return sizeof(pg_info_superblk) - sizeof(char) + num_members * sizeof(pg_members) + num_chunks * sizeof(homestore::chunk_num_t); }
uint32_t size() const {
return sizeof(pg_info_superblk) - sizeof(char) + num_members * sizeof(pg_members) +
num_chunks * sizeof(homestore::chunk_num_t);
}
static std::string name() { return _pg_meta_name; }

pg_info_superblk() = default;
Expand All @@ -117,11 +120,15 @@ class HSHomeObject : public HomeObjectImpl {

void copy(pg_info_superblk const& rhs) { *this = rhs; }

pg_members* get_pg_members_mutable() { return reinterpret_cast<pg_members*>(data); }
const pg_members* get_pg_members() const { return reinterpret_cast<const pg_members*>(data); }
pg_members* get_pg_members_mutable() { return reinterpret_cast< pg_members* >(data); }
const pg_members* get_pg_members() const { return reinterpret_cast< const pg_members* >(data); }

homestore::chunk_num_t* get_chunk_ids_mutable() { return reinterpret_cast<homestore::chunk_num_t*>(data + num_members * sizeof(pg_members)); }
const homestore::chunk_num_t* get_chunk_ids() const { return reinterpret_cast<const homestore::chunk_num_t*>(data + num_members * sizeof(pg_members)); }
homestore::chunk_num_t* get_chunk_ids_mutable() {
return reinterpret_cast< homestore::chunk_num_t* >(data + num_members * sizeof(pg_members));
}
const homestore::chunk_num_t* get_chunk_ids() const {
return reinterpret_cast< const homestore::chunk_num_t* >(data + num_members * sizeof(pg_members));
}
};

struct DataHeader {
Expand Down Expand Up @@ -214,7 +221,8 @@ class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< BlobIndexTable > index_table_;
PGMetrics metrics_;

HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table, std::shared_ptr< const std::vector <homestore::chunk_num_t> > pg_chunk_ids);
HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table,
std::shared_ptr< const std::vector< homestore::chunk_num_t > > pg_chunk_ids);
HS_PG(homestore::superblk< pg_info_superblk >&& sb, shared< homestore::ReplDev > rdev);
~HS_PG() override = default;

Expand Down Expand Up @@ -447,6 +455,8 @@ class HSHomeObject : public HomeObjectImpl {
*/
std::tuple< bool, bool, homestore::chunk_num_t > get_any_chunk_id(pg_id_t pg);

bool pg_exists(pg_id_t pg_id) const;

cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; }

// Blob manager related.
Expand Down
12 changes: 6 additions & 6 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
return;
}
auto pg_id = pg_sb->id;
std::vector<chunk_num_t> chunk_ids(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks);
std::vector< chunk_num_t > chunk_ids(pg_sb->get_chunk_ids(), pg_sb->get_chunk_ids() + pg_sb->num_chunks);
chunk_selector_->set_pg_chunks(pg_id, std::move(chunk_ids));
auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid);
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_sb), std::move(v.value()));
Expand All @@ -302,9 +302,7 @@ void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_c
add_pg_to_map(std::move(hs_pg));
}

void HSHomeObject::on_pg_meta_blk_recover_completed(bool success) {
chunk_selector_->recover_per_dev_chunk_heap();
}
void HSHomeObject::on_pg_meta_blk_recover_completed(bool success) { chunk_selector_->recover_per_dev_chunk_heap(); }

PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb) {
PGInfo pginfo{sb->id};
Expand All @@ -317,15 +315,17 @@ PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superbl
return pginfo;
}

HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table, std::shared_ptr< const std::vector <chunk_num_t> > pg_chunk_ids) :
HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table,
std::shared_ptr< const std::vector< chunk_num_t > > pg_chunk_ids) :
PG{std::move(info)},
pg_sb_{_pg_meta_name},
repl_dev_{std::move(rdev)},
index_table_{std::move(index_table)},
metrics_{*this} {
RELEASE_ASSERT(pg_chunk_ids != nullptr, "PG chunks null");
const uint32_t num_chunks = pg_chunk_ids->size();
pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members)+ num_chunks * sizeof(homestore::chunk_num_t));
pg_sb_.create(sizeof(pg_info_superblk) - sizeof(char) + pg_info_.members.size() * sizeof(pg_members) +
num_chunks * sizeof(homestore::chunk_num_t));
pg_sb_->id = pg_info_.id;
pg_sb_->num_members = pg_info_.members.size();
pg_sb_->num_chunks = num_chunks;
Expand Down
5 changes: 4 additions & 1 deletion src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) {
shared< BlobIndexTable > HSHomeObject::get_index_table(pg_id_t pg_id) {
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
if (iter == _pg_map.end()) {
LOGW("PG not found for pg_id={} when getting inde table", pg_id);
return nullptr;
}
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
RELEASE_ASSERT(hs_pg->index_table_ != nullptr, "Index table not found for PG");
return hs_pg->index_table_;
Expand Down
22 changes: 21 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"
#include <homestore/replication/repl_dev.h>
#include <homestore/replication/repl_decls.h>

namespace homeobject {
void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
Expand Down Expand Up @@ -40,6 +42,16 @@ bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const& heade
// For shard creation, since homestore repldev inside will write shard header to data service first before this
// function is called. So there is nothing is needed to do and we can get the binding chunk_id with the newly shard
// from the blkid in on_commit()
if (ctx->op_code() == homestore::journal_type_t::HS_CTRL_REPLACE) {
LOGI("pre_commit replace member log entry, lsn:{}", lsn);
return true;
}

if (ctx->op_code() == homestore::journal_type_t::HS_CTRL_DESTROY) {
LOGI("pre_commit destroy member log entry, lsn:{}", lsn);
return true;
}

const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
if (msg_header->corrupted()) {
LOGE("corrupted message in pre_commit, lsn:{}", lsn);
Expand Down Expand Up @@ -121,9 +133,17 @@ ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t
const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes());
switch (msg_header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto& pg_id = msg_header->pg_id;
// check whether the pg exists
if (!home_object_->pg_exists(pg_id)) {
LOGI("can not find pg {} when getting blk_alloc_hint", pg_id);
// TODO:: add error code to indicate the pg not found in homestore side
return folly::makeUnexpected(homestore::ReplServiceError::NO_SPACE_LEFT);
}
// Since chunks are selected when a pg is created, the chunkselector selects one of the chunks owned by the pg
homestore::blk_alloc_hints hints;
hints.pdev_id_hint = msg_header->pg_id; // FIXME @Hooper: Temporary bypass using pdev_id_hint to represent pg_id_hint, "identical layout" will change it
hints.pdev_id_hint = pg_id; // FIXME @Hooper: Temporary bypass using pdev_id_hint to represent
// pg_id_hint, "identical layout" will change it
return hints;
}

Expand Down
9 changes: 9 additions & 0 deletions src/lib/homestore_backend/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,16 @@ target_link_libraries(homestore_tests
${COMMON_TEST_DEPS}
)


add_library(homestore_tests_dynamic OBJECT)
target_sources(homestore_tests_dynamic PRIVATE test_homestore_backend_dynamic.cpp)
target_link_libraries(homestore_tests_dynamic
homeobject_homestore
${COMMON_TEST_DEPS}
)

add_executable (test_heap_chunk_selector)
target_sources(test_heap_chunk_selector PRIVATE test_heap_chunk_selector.cpp ../heap_chunk_selector.cpp)
target_link_libraries(test_heap_chunk_selector homestore::homestore ${COMMON_TEST_DEPS})
add_test(NAME HeapChunkSelectorTest COMMAND test_heap_chunk_selector)

76 changes: 61 additions & 15 deletions src/lib/homestore_backend/tests/homeobj_fixture.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,18 +55,36 @@ class HomeObjectFixture : public ::testing::Test {
std::this_thread::sleep_for(std::chrono::seconds(5));
}

// schedule create_pg to replica_num
void create_pg(pg_id_t pg_id, uint32_t replica_num = 0) {
if (replica_num == g_helper->replica_num()) {
auto memebers = g_helper->members();
auto name = g_helper->name();
/**
* \brief create pg with a given id.
*
* \param pg_id pg id that will be newly created.
* \param leader_replica_num the replica number that will be the initial leader of repl dev of this pg
* \param excluding_replicas_in_pg the set of replicas that will be excluded in the initial members of this pg. this
* means all the started replicas that are not in this set will be the initial members of this pg.
*/
void create_pg(pg_id_t pg_id, uint8_t leader_replica_num = 0,
std::optional< std::unordered_set< uint8_t > > excluding_replicas_in_pg = std::nullopt) {
std::unordered_set< uint8_t > excluding_pg_replicas;
if (excluding_replicas_in_pg.has_value()) excluding_pg_replicas = excluding_replicas_in_pg.value();
if (excluding_pg_replicas.contains(leader_replica_num))
RELEASE_ASSERT(false, "fail to create pg, leader_replica_num {} is excluded in the pg", leader_replica_num);

auto my_replica_num = g_helper->replica_num();
if (excluding_pg_replicas.contains(my_replica_num)) return;

auto pg_size = SISL_OPTIONS["pg_size"].as< uint64_t >() * Mi;
auto name = g_helper->test_name();

if (leader_replica_num == my_replica_num) {
auto members = g_helper->members();
auto info = homeobject::PGInfo(pg_id);
info.size = 50 * Mi;
for (const auto& member : memebers) {
if (replica_num == member.second) {
info.size = pg_size;
for (const auto& member : members) {
if (leader_replica_num == member.second) {
// by default, leader is the first member
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 1});
} else {
} else if (!excluding_pg_replicas.contains(member.second)) {
info.members.insert(homeobject::PGMember{member.first, name + std::to_string(member.second), 0});
}
}
Expand All @@ -84,6 +102,7 @@ class HomeObjectFixture : public ::testing::Test {

ShardInfo create_shard(pg_id_t pg_id, uint64_t size_bytes) {
g_helper->sync();
if (!am_i_in_pg(pg_id)) return {};
// schedule create_shard only on leader
run_on_pg_leader(pg_id, [&]() {
auto s = _obj_inst->shard_manager()->create_shard(pg_id, size_bytes).get();
Expand Down Expand Up @@ -113,10 +132,9 @@ class HomeObjectFixture : public ::testing::Test {
}

ShardInfo seal_shard(shard_id_t shard_id) {
// before seal shard, we need to wait all the memebers to complete shard state verification
g_helper->sync();
auto r = _obj_inst->shard_manager()->get_shard(shard_id).get();
RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id);
if (!r) return {};
auto pg_id = r.value().placement_group;

run_on_pg_leader(pg_id, [&]() {
Expand All @@ -135,10 +153,10 @@ class HomeObjectFixture : public ::testing::Test {
}
}

void put_blob(shard_id_t shard_id, Blob&& blob) {
void put_blob(shard_id_t shard_id, Blob&& blob, bool need_sync_before_start = true) {
g_helper->sync();
auto r = _obj_inst->shard_manager()->get_shard(shard_id).get();
RELEASE_ASSERT(!!r, "failed to get shard {}", shard_id);
if (!r) return;
auto pg_id = r.value().placement_group;

run_on_pg_leader(pg_id, [&]() {
Expand All @@ -160,9 +178,11 @@ class HomeObjectFixture : public ::testing::Test {

// TODO:make this run in parallel
void put_blobs(std::map< pg_id_t, std::vector< shard_id_t > > const& pg_shard_id_vec,
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) {
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id,
bool need_sync_before_start = true) {
g_helper->sync();
for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) {
if (!am_i_in_pg(pg_id)) continue;
// the blob_id of a pg is a continuous number starting from 0 and increasing by 1
blob_id_t current_blob_id{pg_blob_id[pg_id]};

Expand Down Expand Up @@ -195,6 +215,7 @@ class HomeObjectFixture : public ::testing::Test {
pg_blob_id[pg_id] = current_blob_id;
auto last_blob_id = pg_blob_id[pg_id] - 1;
while (!blob_exist(shard_id, last_blob_id)) {
LOGINFO("waiting for pg_id {} blob {} to be created locally", pg_id, last_blob_id);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
LOGINFO("shard {} blob {} is created locally, which means all the blob before {} are created", shard_id,
Expand All @@ -207,6 +228,7 @@ class HomeObjectFixture : public ::testing::Test {
uint64_t const num_blobs_per_shard, std::map< pg_id_t, blob_id_t >& pg_blob_id) {
g_helper->sync();
for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) {
if (!am_i_in_pg(pg_id)) continue;
run_on_pg_leader(pg_id, [&]() {
blob_id_t current_blob_id{0};
for (; current_blob_id < pg_blob_id[pg_id];) {
Expand Down Expand Up @@ -235,6 +257,7 @@ class HomeObjectFixture : public ::testing::Test {
uint32_t off = 0, len = 0;

for (const auto& [pg_id, shard_vec] : pg_shard_id_vec) {
if (!am_i_in_pg(pg_id)) continue;
blob_id_t current_blob_id{0};
for (const auto& shard_id : shard_vec) {
for (uint64_t k = 0; k < num_blobs_per_shard; k++) {
Expand Down Expand Up @@ -272,6 +295,7 @@ class HomeObjectFixture : public ::testing::Test {
uint32_t exp_tombstone_blobs = deleted_all ? shards_per_pg * blobs_per_shard : 0;

for (uint32_t i = 1; i <= num_pgs; ++i) {
if (!am_i_in_pg(i)) continue;
PGStats stats;
_obj_inst->pg_manager()->get_stats(i, stats);
ASSERT_EQ(stats.num_active_objects, exp_active_blobs)
Expand Down Expand Up @@ -330,11 +354,33 @@ class HomeObjectFixture : public ::testing::Test {
void run_on_pg_leader(pg_id_t pg_id, auto&& lambda) {
PGStats pg_stats;
auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats);
RELEASE_ASSERT(res, "can not get pg {} stats", pg_id);
if (!res) return;
if (g_helper->my_replica_id() == pg_stats.leader_id) { lambda(); }
// TODO: add logic for check and retry of leader change if necessary
}

void run_if_in_pg(pg_id_t pg_id, auto&& lambda) {
if (am_i_in_pg(pg_id)) lambda();
}

bool am_i_in_pg(pg_id_t pg_id) {
PGStats pg_stats;
auto res = _obj_inst->pg_manager()->get_stats(pg_id, pg_stats);
if (!res) return false;
for (const auto& member : pg_stats.members) {
if (std::get< 0 >(member) == g_helper->my_replica_id()) return true;
}
return false;
}

// wait for the last blob to be created locally, which means all the blob before this blob are created
void wait_for_all(shard_id_t shard_id, blob_id_t blob_id) {
while (true) {
if (blob_exist(shard_id, blob_id)) return;
std::this_thread::sleep_for(1s);
}
}

private:
bool pg_exist(pg_id_t pg_id) {
std::vector< pg_id_t > pg_ids;
Expand Down
Loading
Loading