Skip to content

Commit

Permalink
Add framework for snapshot resync
Browse files Browse the repository at this point in the history
- pg_blob_iterator is the snapshot resync context for leader(read path)
- SnapshotReceiveHandler is a newly added structure as the snapshot context for follower(write path)
- Comment previous implementation codes
  • Loading branch information
yuwmao committed Nov 1, 2024
1 parent 06de30c commit 3c22266
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 29 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.6"
version = "2.1.7"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
37 changes: 35 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "replication_message.hpp"
#include "homeobject/common.hpp"
#include "index_kv.hpp"
#include "generated/resync_pg_data_generated.h"
#include "generated/resync_shard_data_generated.h"
#include "generated/resync_blob_data_generated.h"

namespace homestore {
struct meta_blk;
Expand Down Expand Up @@ -124,6 +127,8 @@ class HSHomeObject : public HomeObjectImpl {
ShardInfo info;
homestore::chunk_num_t chunk_id;
};

struct snapshot_info_superblk {};
#pragma pack()

public:
Expand Down Expand Up @@ -287,12 +292,18 @@ class HSHomeObject : public HomeObjectImpl {
int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard);
void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_pg_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard);
bool end_of_scan() const;

uint64_t cur_shard_seq_num_{1};
int64_t cur_blob_id_{-1};
uint64_t snp_start_lsn{0};
std::vector<ShardInfo> shard_list{0};
shard_id_t cur_shard_seq_num_{1};
std::vector<BlobInfo> cur_blob_list{0};
int64_t last_end_blob_idx{-1};
int64_t last_batch_size{0};
uint64_t max_shard_seq_num_{0};
uint64_t cur_snapshot_batch_num{0};
HSHomeObject& home_obj_;
Expand All @@ -301,6 +312,28 @@ class HSHomeObject : public HomeObjectImpl {
shared< homestore::ReplDev > repl_dev_;
};

// SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version.
struct SnapshotReceiveHandler {
SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id);
void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta);
void process_shard_snapshot_data(ResyncShardMetaData const& shard_meta, snp_obj_id_t& obj_id);
void process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blob, snp_obj_id_t& obj_id);

shard_id_t shard_cursor{0};
blob_id_t blob_cursor{0};
snp_batch_id_t cur_batch_num{0};
std::vector<shard_id_t> shard_list;

HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;

//snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
// other stats for snapshot transmission progress
};

private:
shared< HeapChunkSelector > chunk_selector_;
unique< HttpManager > http_mgr_;
Expand Down
47 changes: 28 additions & 19 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
#include <sisl/settings/settings.hpp>
#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"
#include "generated/resync_pg_data_generated.h"
#include "generated/resync_shard_data_generated.h"

namespace homeobject {
HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id,
Expand Down Expand Up @@ -36,26 +37,34 @@ PG* HSHomeObject::PGBlobIterator::get_pg_metadata() {
}

void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) {
auto pg = get_pg_metadata();
auto& pg_info = pg->pg_info_;
auto& pg_shards = pg->shards_;
// auto pg = get_pg_metadata();
// auto& pg_info = pg->pg_info_;
// auto& pg_shards = pg->shards_;
//
// flatbuffers::FlatBufferBuilder builder;
// std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size());
// std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin());
// auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid));
//
// std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries;
// for (auto& shard : pg_shards) {
// auto& shard_info = shard->info;
// // TODO add lsn.
// shard_entries.push_back(CreateShardInfoEntry(
// builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id,
// shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time));
// }
// builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries)));
// meta_blob = sisl::io_blob_safe{builder.GetSize()};
// std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

flatbuffers::FlatBufferBuilder builder;
std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size());
std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin());
auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid));
void HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& meta_blob) {
//TODO
}

std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries;
for (auto& shard : pg_shards) {
auto& shard_info = shard->info;
// TODO add lsn.
shard_entries.push_back(CreateShardInfoEntry(
builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id,
shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time));
}
builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries)));
meta_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
void HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe& meta_blob) {
//TODO
}

int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
Expand Down
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
#include "hs_backend_config.hpp"
#include "lib/blob_route.hpp"

#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"
#include "generated/resync_pg_data_generated.h"
#include "generated/resync_shard_data_generated.h"

namespace homeobject {
void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, const sisl::blob& key,
Expand Down
11 changes: 5 additions & 6 deletions src/lib/homestore_backend/tests/hs_blob_tests.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "homeobj_fixture.hpp"

#include "lib/homestore_backend/index_kv.hpp"
#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"

TEST_F(HomeObjectFixture, BasicEquivalence) {
Expand Down Expand Up @@ -175,11 +174,11 @@ TEST_F(HomeObjectFixture, PGBlobIterator) {
pg1_iter->create_pg_shard_snapshot_data(meta_blob);
ASSERT_TRUE(meta_blob.size() > 0);

auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes());
ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id);
auto u1 = pg_req->pg()->replica_set_uuid();
auto u2 = pg1->pg_info_.replica_set_uuid;
ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));
// auto pg_req = GetSizePrefixedResyncPGShardInfo(meta_blob.bytes());
// ASSERT_EQ(pg_req->pg()->pg_id(), pg1->pg_info_.id);
// auto u1 = pg_req->pg()->replica_set_uuid();
// auto u2 = pg1->pg_info_.replica_set_uuid;
// ASSERT_EQ(std::string(u1->begin(), u1->end()), std::string(u2.begin(), u2.end()));

// Verify get blobs for pg.
uint64_t max_num_blobs_in_batch = 3, max_batch_size_bytes = 128 * Mi;
Expand Down

0 comments on commit 3c22266

Please sign in to comment.