Skip to content

Commit

Permalink
Merge pull request #236 from yuwmao/baseline_resync
Browse files Browse the repository at this point in the history
Fix some bugs in baseline resync
  • Loading branch information
koujl authored Dec 9, 2024
2 parents 2726477 + 55efbde commit 1f5b242
Show file tree
Hide file tree
Showing 12 changed files with 77 additions and 44 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.13"
version = "2.1.17"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down Expand Up @@ -49,7 +49,7 @@ def build_requirements(self):

def requirements(self):
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("homestore/[^6.4]@oss/master")
self.requires("homestore/[^6.5.19]@oss/master")
self.requires("iomgr/[^11.3]@oss/master")
self.requires("lz4/1.9.4", override=True)
self.requires("openssl/3.3.1", override=True)
Expand Down
12 changes: 12 additions & 0 deletions src/lib/homestore_backend/heap_chunk_selector.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,18 @@ class HeapChunkSelector : public homestore::ChunkSelector {

uint32_t get_chunk_size() const;

/**
* @brief Returns the number of disks we seen
*
* Warning : calling this before HS fully start might getting wrong result.
*
* This function returns the number of disks the chunk selector seen.
* It should be the accurate source that how many disks in the system for data.
* If a disk is down in degraded mode, it wont be load and no chunk will be
* added into selector.
*/
uint32_t total_disks() const { return m_per_dev_heap.size(); }

private:
void add_chunk_internal(const chunk_num_t, bool add_to_heap = true);

Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s

if (!repl_dev->is_leader()) {
LOGW("failed to put blob for pg [{}], shard [{}], not leader", pg_id, shard.id);
return folly::makeUnexpected(BlobErrorCode::NOT_LEADER);
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

// Create a put_blob request which allocates for header, key and blob_header, user_key. Data sgs are added later
Expand Down Expand Up @@ -385,7 +385,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo

if (!repl_dev->is_leader()) {
LOGW("failed to del blob for pg [{}], shard [{}], blob_id [{}], not leader", pg_id, shard.id, blob_id);
return folly::makeUnexpected(BlobErrorCode::NOT_LEADER);
return folly::makeUnexpected(BlobError(BlobErrorCode::NOT_LEADER, repl_dev->get_leader_id()));
}

// Create an unaligned header request unaligned
Expand Down
10 changes: 8 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ class HSReplApplication : public homestore::ReplApplication {
return it->second;
}

void on_repl_devs_init_completed() override {
_home_object->on_replica_restart();
}

std::pair< std::string, uint16_t > lookup_peer(homestore::replica_id_t uuid) const override {
std::string endpoint;
// for folly::uri to parse correctly, we need to add "http://" prefix
Expand Down Expand Up @@ -195,13 +199,14 @@ void HSHomeObject::init_homestore() {
.chunk_sel_type = chunk_selector_type_t::CUSTOM}},
});
}

// We dont have any repl dev now, explicitly call init_completed_cb() where we register PG/Shard meta types.
repl_app->on_repl_devs_init_completed();
// Create a superblock that contains our SvcId
auto svc_sb = homestore::superblk< svc_info_superblk_t >(_svc_meta_name);
svc_sb.create(sizeof(svc_info_superblk_t));
svc_sb->svc_id_ = _our_id;
svc_sb.write();
on_replica_restart();

} else {
RELEASE_ASSERT(!_our_id.is_nil(), "No SvcId read after HomeStore recovery!");
auto const new_id = app->discover_svcid(_our_id);
Expand Down Expand Up @@ -313,6 +318,7 @@ HomeObjectStats HSHomeObject::_get_stats() const {

stats.num_open_shards = num_open_shards;
stats.avail_open_shards = chunk_selector()->total_chunks() - num_open_shards;
stats.num_disks = chunk_selector()->total_disks();
return stats;
}

Expand Down
4 changes: 2 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,8 @@ class HSHomeObject : public HomeObjectImpl {

std::vector< ShardEntry > shard_list_{0};

objId cur_obj_id_{1, 0};
uint64_t cur_shard_idx_{0};
objId cur_obj_id_{0, 0};
int64_t cur_shard_idx_{-1};
std::vector<BlobInfo> cur_blob_list_{0};
uint64_t cur_start_blob_idx_{0};
uint64_t cur_batch_blob_count_{0};
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ objId HSHomeObject::PGBlobIterator::expected_next_obj_id() {
return objId(cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id + 1);
}
//next shard
if (cur_shard_idx_ < shard_list_.size() - 1) {
if (cur_shard_idx_ < static_cast< int64_t >(shard_list_.size() - 1)) {
auto next_shard_seq_num = shard_list_[cur_shard_idx_ + 1].info.id & 0xFFFFFFFFFFFF;
return objId(next_shard_seq_num, 0);
}
Expand Down Expand Up @@ -103,7 +103,7 @@ bool HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& m
shard_ids.push_back(shard.info.id);
}

auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg->durable_entities().blob_sequence_num,
auto pg_entry = CreateResyncPGMetaDataDirect(builder_, pg_info.id, &uuid, pg_info.size, pg_info.chunk_size, pg->durable_entities().blob_sequence_num,
pg->shard_sequence_num_, &members, &shard_ids);
builder_.FinishSizePrefixed(pg_entry);

Expand Down Expand Up @@ -212,7 +212,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
}

sisl::io_blob_safe blob;
auto retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
for (int i = 0; i < retries; i++) {
auto result = load_blob_data(info, state).get();
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
Expand Down Expand Up @@ -241,7 +241,7 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
builder_.FinishSizePrefixed(
CreateResyncBlobDataBatchDirect(builder_, &blob_entries, end_of_shard));

LOGD("create blobs snapshot data batch: shard_id={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
LOGD("create blobs snapshot data batch: shard_seq_num={}, batch_num={}, total_bytes={}, blob_num={}, end_of_shard={}",
cur_obj_id_.shard_seq_num, cur_obj_id_.batch_id, total_bytes, blob_entries.size(), end_of_shard);

pack_resync_message(data_blob, SyncMessageType::SHARD_BATCH);
Expand Down
58 changes: 31 additions & 27 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ void ReplicationStateMachine::on_rollback(int64_t lsn, sisl::blob const& header,
}
}

void ReplicationStateMachine::on_restart() { home_object_->on_replica_restart(); }
void ReplicationStateMachine::on_restart() { LOGD("ReplicationStateMachine::on_restart");}

void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& header, const sisl::blob& key,
cintrusive< repl_req_ctx >& ctx) {
Expand Down Expand Up @@ -231,16 +231,18 @@ std::shared_ptr< homestore::snapshot_context > ReplicationStateMachine::last_sna
return snp;
}

int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) {
int ReplicationStateMachine::read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
HSHomeObject::PGBlobIterator* pg_iter = nullptr;
auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();

if (snp_data->user_ctx == nullptr) {
if (snp_obj->user_ctx == nullptr) {
// Create the pg blob iterator for the first time.
pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id(), context->get_lsn());
snp_data->user_ctx = (void*)pg_iter;
} else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); }
snp_obj->user_ctx = (void*)pg_iter;
} else {
pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_obj->user_ctx);
}

// Nuraft uses obj_id as a way to track the state of the snapshot read and write.
// Nuraft starts with obj_id == 0 as first message always, leader send all the shards and
Expand All @@ -258,16 +260,17 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap
boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(),
s->get_last_log_idx());
//TODO snp_data->offset is int64, need to change to uint64 in homestore
if (snp_data->offset == int64_t(LAST_OBJ_ID)) {
if (snp_obj->offset == LAST_OBJ_ID) {
// No more shards to read, baseline resync is finished after this.
snp_data->is_last_obj = true;
snp_obj->is_last_obj = true;
LOGD("Read snapshot end, {}", log_str);
return 0;
}

auto obj_id = objId(snp_data->offset);
auto obj_id = objId(snp_obj->offset);
log_str = fmt::format("{} shard_seq_num={} batch_num={}", log_str, obj_id.shard_seq_num, obj_id.batch_id);

LOGD("read current snp obj {}", log_str)
//invalid Id
if (!pg_iter->update_cursor(obj_id)) {
LOGW("Invalid objId in snapshot read, {}, current shard_seq_num={}, current batch_num={}",
Expand All @@ -278,7 +281,7 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap
//pg metadata message
//shardId starts from 1
if (obj_id.shard_seq_num == 0) {
if (!pg_iter->create_pg_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_pg_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create pg snapshot data for snapshot read, {}", log_str);
return -1;
}
Expand All @@ -290,54 +293,54 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap
LOGE("Failed to generate shard blob list for snapshot read, {}", log_str);
return -1;
}
if (!pg_iter->create_shard_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_shard_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create shard meta data for snapshot read, {}", log_str);
return -1;
}
return 0;
}
//general blob message
if (!pg_iter->create_blobs_snapshot_data(snp_data->blob)) {
if (!pg_iter->create_blobs_snapshot_data(snp_obj->blob)) {
LOGE("Failed to create blob batch data for snapshot read, {}", log_str);
return -1;
}
return 0;
}

void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) {
void ReplicationStateMachine::write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) {
RELEASE_ASSERT(context != nullptr, "Context null");
RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null");
RELEASE_ASSERT(snp_obj != nullptr, "Snapshot data null");
auto r_dev = repl_dev();
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
}

auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset));
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_obj->offset));
auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}",
uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(),
obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size());
obj_id.shard_seq_num, obj_id.batch_id, snp_obj->blob.size());

if (snp_data->is_last_obj) {
if (snp_obj->is_last_obj) {
LOGD("Write snapshot reached is_last_obj true {}", log_suffix);
return;
}

// Check message integrity
// TODO: add a flip here to simulate corrupted message
auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes());
auto header = r_cast< const SyncMessageHeader* >(snp_obj->blob.cbytes());
if (header->corrupted()) {
LOGE("corrupted message in write_snapshot_data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(),
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
if (auto payload_size = snp_data->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) {
if (auto payload_size = snp_obj->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) {
LOGE("payload size mismatch in write_snapshot_data {} != {}, lsn:{}, obj_id {} shard {} batch {}", payload_size,
header->payload_size, s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id);
return;
}
auto data_buf = snp_data->blob.cbytes() + sizeof(SyncMessageHeader);
auto data_buf = snp_obj->blob.cbytes() + sizeof(SyncMessageHeader);

if (obj_id.shard_seq_num == 0) {
// PG metadata & shard list message
Expand All @@ -348,7 +351,8 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) {
m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn());
LOGI("reset context from lsn:{} to lsn:{}", m_snp_rcv_handler->get_context_lsn(), context->get_lsn());
m_snp_rcv_handler->reset_context(context->get_lsn(), pg_data->pg_id());
// TODO: Reset all data of current PG - let's resync on a pristine base
}

Expand All @@ -359,7 +363,7 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
obj_id.value, obj_id.shard_seq_num, obj_id.batch_id, ret);
return;
}
snp_data->offset =
snp_obj->offset =
objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value;
LOGD("Write snapshot, processed PG data pg_id:{} {}", pg_data->pg_id(), log_suffix);
return;
Expand All @@ -380,7 +384,7 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
return;
}
// Request for the next batch
snp_data->offset = objId(obj_id.shard_seq_num, 1).value;
snp_obj->offset = objId(obj_id.shard_seq_num, 1).value;
LOGD("Write snapshot, processed shard data shard_seq_num:{} {}", obj_id.shard_seq_num, log_suffix);
return;
}
Expand All @@ -403,12 +407,12 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
if (blob_batch->is_last_batch()) {
auto next_shard = m_snp_rcv_handler->get_next_shard();
if (next_shard == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker) {
snp_data->offset = LAST_OBJ_ID;
snp_obj->offset = LAST_OBJ_ID;
} else {
snp_data->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value;
snp_obj->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value;
}
} else {
snp_data->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value;
snp_obj->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value;
}

LOGD("Write snapshot, processed blob data shard_seq_num:{} batch_num:{} {}", obj_id.shard_seq_num, obj_id.batch_id,
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/replication_state_machine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ class ReplicationStateMachine : public homestore::ReplDevListener {
homestore::AsyncReplResult<> create_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual bool apply_snapshot(std::shared_ptr< homestore::snapshot_context > context) override;
virtual std::shared_ptr< homestore::snapshot_context > last_snapshot() override;
virtual int read_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) override;
virtual void write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_data > snp_data) override;
virtual int read_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void write_snapshot_obj(std::shared_ptr< homestore::snapshot_context > context,
std::shared_ptr< homestore::snapshot_obj > snp_obj) override;
virtual void free_user_snp_ctx(void*& user_snp_ctx) override;

private:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/resync_pg_data.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ table Member {
table ResyncPGMetaData {
pg_id : uint16; // only low 16 bit is used for pg_id;
replica_set_uuid : [ubyte]; // uuid of replica set
pg_size : uint64; // pg size;
chunk_size : uint64; // chunk size;
blob_seq_num : uint64; // blob sequence number, used to assign next blob id;
shard_seq_num: uint64; // shard sequence number, used to assign next shard id;
members : [Member]; // peers;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD

// Create local PG
PGInfo pg_info(pg_meta.pg_id());
pg_info.size = pg_meta.pg_size();
pg_info.chunk_size = pg_meta.chunk_size();
std::copy_n(pg_meta.replica_set_uuid()->data(), 16, pg_info.replica_set_uuid.begin());
for (unsigned int i = 0; i < pg_meta.members()->size(); i++) {
const auto member = pg_meta.members()->Get(i);
Expand Down
7 changes: 6 additions & 1 deletion src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ TEST_F(HomeObjectFixture, PGBlobIterator) {
auto u1 = pg_msg->replica_set_uuid();
auto u2 = pg->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
ASSERT_EQ(pg_msg->pg_size(), pg->pg_info_.size);
ASSERT_EQ(pg_msg->chunk_size(), pg->pg_info_.chunk_size);
ASSERT_EQ(pg_msg->blob_seq_num(), pg->durable_entities().blob_sequence_num.load());
ASSERT_EQ(pg_msg->shard_seq_num(), pg->shard_sequence_num_);

Expand Down Expand Up @@ -300,6 +302,9 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
// We have to create a PG first to init repl_dev
constexpr pg_id_t pg_id = 1;
create_pg(pg_id); // to create repl dev
auto iter = _obj_inst->_pg_map.find(pg_id);
ASSERT_TRUE(iter != _obj_inst->_pg_map.end());
auto pg = iter->second.get();
PGStats stats;
ASSERT_TRUE(_obj_inst->pg_manager()->get_stats(pg_id, stats));
auto r_dev = homestore::HomeStore::instance()->repl_service().get_repl_dev(stats.replica_set_uuid);
Expand All @@ -324,7 +329,7 @@ TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {
shard_ids.push_back(i);
}
auto pg_entry =
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
CreateResyncPGMetaDataDirect(builder, pg_id, &uuid, pg->pg_info_.size, pg->pg_info_.chunk_size, blob_seq_num, num_shards_per_pg, &members, &shard_ids);
builder.Finish(pg_entry);
auto pg_meta = GetResyncPGMetaData(builder.GetBufferPointer());
auto ret = handler->process_pg_snapshot_data(*pg_meta);
Expand Down
2 changes: 2 additions & 0 deletions src/lib/homestore_backend/tests/test_heap_chunk_selector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ TEST_F(HeapChunkSelectorTest, test_for_each_chunk) {
ASSERT_EQ(size.load(), 18);
}

TEST_F(HeapChunkSelectorTest, test_total_disks) { ASSERT_EQ(HCS.total_disks(), 3); }

TEST_F(HeapChunkSelectorTest, test_identical_layout) {
const homestore::blk_count_t count = 1;
homestore::blk_alloc_hints hints;
Expand Down

0 comments on commit 1f5b242

Please sign in to comment.