Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Baseline resync msg protocol and data structure #221

Merged
merged 4 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.6"
version = "2.1.7"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
2 changes: 2 additions & 0 deletions src/include/homeobject/common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ using blob_id_t = uint64_t;
using peer_id_t = boost::uuids::uuid;
using pg_id_t = uint16_t;
using shard_id_t = uint64_t;
using snp_batch_id_t = uint16_t;
using snp_obj_id_t = uint64_t;

template < class E >
class Manager {
Expand Down
4 changes: 3 additions & 1 deletion src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ settings_gen_cpp(
${CMAKE_CURRENT_BINARY_DIR}/generated/
"${PROJECT_NAME}_homestore"
hs_backend_config.fbs
resync_pg_shard.fbs
resync_pg_data.fbs
resync_shard_data.fbs
resync_blob_data.fbs
resync_payload.fbs
)

add_subdirectory(tests)
Expand Down
39 changes: 37 additions & 2 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
#include "replication_message.hpp"
#include "homeobject/common.hpp"
#include "index_kv.hpp"
#include "generated/resync_pg_data_generated.h"
#include "generated/resync_shard_data_generated.h"
#include "generated/resync_blob_data_generated.h"

namespace homestore {
struct meta_blk;
Expand Down Expand Up @@ -124,6 +127,8 @@ class HSHomeObject : public HomeObjectImpl {
ShardInfo info;
homestore::chunk_num_t chunk_id;
};
//TODO this blk is used to store snapshot metadata/status for recovery
struct snapshot_info_superblk {};
yuwmao marked this conversation as resolved.
Show resolved Hide resolved
#pragma pack()

public:
Expand Down Expand Up @@ -287,12 +292,18 @@ class HSHomeObject : public HomeObjectImpl {
int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard);
void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_pg_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard);
bool end_of_scan() const;

uint64_t cur_shard_seq_num_{1};
int64_t cur_blob_id_{-1};
uint64_t snp_start_lsn{0};
std::vector<ShardInfo> shard_list{0};
shard_id_t cur_shard_seq_num_{1};
std::vector<BlobInfo> cur_blob_list{0};
int64_t last_end_blob_idx{-1};
int64_t last_batch_size{0};
uint64_t max_shard_seq_num_{0};
uint64_t cur_snapshot_batch_num{0};
HSHomeObject& home_obj_;
Expand All @@ -301,6 +312,30 @@ class HSHomeObject : public HomeObjectImpl {
shared< homestore::ReplDev > repl_dev_;
};

// SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version.
struct SnapshotReceiveHandler {
SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id);
void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta);
void process_shard_snapshot_data(ResyncShardMetaData const& shard_meta, snp_obj_id_t& obj_id);
void process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blob, snp_obj_id_t& obj_id);

//snapshot start lsn
int64_t snp_lsn{0};
shard_id_t shard_cursor{0};
blob_id_t blob_cursor{0};
snp_batch_id_t cur_batch_num{0};
std::vector<shard_id_t> shard_list;

HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
yuwmao marked this conversation as resolved.
Show resolved Hide resolved

//snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
// other stats for snapshot transmission progress
};

private:
shared< HeapChunkSelector > chunk_selector_;
unique< HttpManager > http_mgr_;
Expand Down
156 changes: 82 additions & 74 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
#include <sisl/settings/settings.hpp>
#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"
#include "generated/resync_pg_data_generated.h"
#include "generated/resync_shard_data_generated.h"

namespace homeobject {

HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id,
uint64_t upto_lsn) :
home_obj_(home_obj), group_id_(group_id) {
Expand Down Expand Up @@ -37,89 +37,97 @@ PG* HSHomeObject::PGBlobIterator::get_pg_metadata() {
}

void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) {
auto pg = get_pg_metadata();
auto& pg_info = pg->pg_info_;
auto& pg_shards = pg->shards_;
// auto pg = get_pg_metadata();
// auto& pg_info = pg->pg_info_;
// auto& pg_shards = pg->shards_;
//
// flatbuffers::FlatBufferBuilder builder;
// std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size());
// std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin());
// auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid));
//
// std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries;
// for (auto& shard : pg_shards) {
// auto& shard_info = shard->info;
// // TODO add lsn.
// shard_entries.push_back(CreateShardInfoEntry(
// builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id,
// shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time));
// }
// builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries)));
// meta_blob = sisl::io_blob_safe{builder.GetSize()};
// std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

flatbuffers::FlatBufferBuilder builder;
std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size());
std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin());
auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid));
void HSHomeObject::PGBlobIterator::create_pg_snapshot_data(sisl::io_blob_safe& meta_blob) {
//TODO
}

std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries;
for (auto& shard : pg_shards) {
auto& shard_info = shard->info;
// TODO add lsn.
shard_entries.push_back(CreateShardInfoEntry(
builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id,
shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time));
}
builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries)));
meta_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
void HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe& meta_blob) {
//TODO
}

int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< BlobInfoData >& blob_data_vec, bool& end_of_shard) {
end_of_shard = false;
uint64_t total_bytes = 0, num_blobs = 0;
while (true) {
auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch);
if (!r) { return -1; }
auto& index_results_vec = r.value();
for (auto& info : index_results_vec) {
if (info.pbas == HSHomeObject::tombstone_pbas) {
// Skip deleted blobs
continue;
}
auto result = home_obj_
._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/,
0 /* req_len */, info.pbas)
.get();
if (!result) {
LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id,
info.pbas.to_string(), result.error());
return -1;
}

auto& blob = result.value();
num_blobs++;
total_bytes += blob.body.size() + blob.user_key.size();
if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; }

BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)};
blob_data_vec.push_back(std::move(blob_data));
cur_blob_id_ = info.blob_id;
}

if (index_results_vec.empty()) {
// We got empty results from index, which means we read
// all the blobs in the current shard
end_of_shard = true;
cur_shard_seq_num_++;
cur_blob_id_ = -1;
break;
}
}

// end_of_shard = false;
// uint64_t total_bytes = 0, num_blobs = 0;
// while (true) {
// auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch);
// if (!r) { return -1; }
// auto& index_results_vec = r.value();
// for (auto& info : index_results_vec) {
// if (info.pbas == HSHomeObject::tombstone_pbas) {
// // Skip deleted blobs
// continue;
// }
// auto result = home_obj_
// ._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/,
// 0 /* req_len */, info.pbas)
// .get();
// if (!result) {
// LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id,
// info.pbas.to_string(), result.error());
// return -1;
// }
//
// auto& blob = result.value();
// num_blobs++;
// total_bytes += blob.body.size() + blob.user_key.size();
// if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; }
//
// BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)};
// blob_data_vec.push_back(std::move(blob_data));
// cur_blob_id_ = info.blob_id;
// }
//
// if (index_results_vec.empty()) {
// // We got empty results from index, which means we read
// // all the blobs in the current shard
// end_of_shard = true;
// cur_shard_seq_num_++;
// cur_blob_id_ = -1;
// break;
// }
// }
//
return 0;
}

void HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(std::vector< BlobInfoData >& blob_data_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard) {
std::vector< ::flatbuffers::Offset< BlobData > > blob_entries;
flatbuffers::FlatBufferBuilder builder;
for (auto& b : blob_data_vec) {
blob_entries.push_back(
CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(),
builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())),
b.blob.user_key.size()),
builder.CreateVector(b.blob.body.bytes(), b.blob.body.size())));
}
builder.FinishSizePrefixed(
CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */));
data_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
// std::vector< ::flatbuffers::Offset< BlobData > > blob_entries;
// flatbuffers::FlatBufferBuilder builder;
// for (auto& b : blob_data_vec) {
// blob_entries.push_back(
// CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(),
// builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())),
// b.blob.user_key.size()),
// builder.CreateVector(b.blob.body.bytes(), b.blob.body.size())));
// }
// builder.FinishSizePrefixed(
// CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */));
// data_blob = sisl::io_blob_safe{builder.GetSize()};
// std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

bool HSHomeObject::PGBlobIterator::end_of_scan() const {
Expand Down
Loading