Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor SnapshotReceiveHandler & Add UT #232

Merged
merged 5 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,8 @@ class HSHomeObject : public HomeObjectImpl {
uint64_t max_batch_size_;
};

// SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version.
struct SnapshotReceiveHandler {
class SnapshotReceiveHandler {
public:
enum ErrorCode {
ALLOC_BLK_ERR = 1,
WRITE_DATA_ERR,
Expand All @@ -329,24 +329,37 @@ class HSHomeObject : public HomeObjectImpl {
constexpr static shard_id_t invalid_shard_id = 0;
constexpr static shard_id_t shard_list_end_marker = ULLONG_MAX;

SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id, int64_t lsn,
shared< homestore::ReplDev > repl_dev);
SnapshotReceiveHandler(HSHomeObject& home_obj, shared< homestore::ReplDev > repl_dev);

void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta);
int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta);
int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num);
int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num,
bool is_last_batch);

int64_t get_context_lsn() const;
void reset_context(int64_t lsn, pg_id_t pg_id);
shard_id_t get_shard_cursor() const;
shard_id_t get_next_shard() const;

shard_id_t shard_cursor_{invalid_shard_id};
snp_batch_id_t cur_batch_num_{0};
std::vector< shard_id_t > shard_list_;
private:
// SnapshotContext is the context data of current snapshot transmission
struct SnapshotContext {
shard_id_t shard_cursor{invalid_shard_id};
snp_batch_id_t cur_batch_num{0};
std::vector< shard_id_t > shard_list;
const int64_t snp_lsn;
const pg_id_t pg_id;
shared< BlobIndexTable > index_table;

SnapshotContext(int64_t lsn, pg_id_t pg_id) : snp_lsn{lsn}, pg_id{pg_id} {}
};

const int64_t snp_lsn_{0};
HSHomeObject& home_obj_;
const homestore::group_id_t group_id_;
const pg_id_t pg_id_;
const shared< homestore::ReplDev > repl_dev_;

//snapshot info, can be used as a checkpoint for recovery
std::unique_ptr< SnapshotContext > ctx_;

// snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
// other stats for snapshot transmission progress
};
Expand Down
34 changes: 19 additions & 15 deletions src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap

if (snp_data->user_ctx == nullptr) {
// Create the pg blob iterator for the first time.
pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id());
pg_iter = new HSHomeObject::PGBlobIterator(*home_object_, repl_dev()->group_id(), context->get_lsn());
snp_data->user_ctx = (void*)pg_iter;
} else { pg_iter = r_cast< HSHomeObject::PGBlobIterator* >(snp_data->user_ctx); }

Expand Down Expand Up @@ -268,10 +268,13 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
std::shared_ptr< homestore::snapshot_data > snp_data) {
RELEASE_ASSERT(context != nullptr, "Context null");
RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null");
auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset));
auto r_dev = repl_dev();
if (!m_snp_rcv_handler) {
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(*home_object_, r_dev);
}

auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot();
auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset));
auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}",
uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(),
obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size());
Expand All @@ -281,10 +284,6 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
return;
}

// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler && m_snp_rcv_handler->snp_lsn_ != context->get_lsn()) { m_snp_rcv_handler.reset(nullptr); }

// Check message integrity
// TODO: add a flip here to simulate corrupted message
auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes());
Expand All @@ -304,20 +303,24 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
// PG metadata & shard list message
RELEASE_ASSERT(obj_id.batch_id == 0, "Invalid obj_id");

// TODO: Reset all data of current PG - let's resync on a pristine base

auto pg_data = GetSizePrefixedResyncPGMetaData(data_buf);
m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >(
*home_object_, pg_data->pg_id(), r_dev->group_id(), context->get_lsn(), r_dev);

// Check if the snapshot context is same as the current snapshot context.
// If not, drop the previous context and re-init a new one
if (m_snp_rcv_handler->get_context_lsn() != context->get_lsn()) {
m_snp_rcv_handler->reset_context(pg_data->pg_id(), context->get_lsn());
// TODO: Reset all data of current PG - let's resync on a pristine base
}

m_snp_rcv_handler->process_pg_snapshot_data(*pg_data);
snp_data->offset =
objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value;
LOGD("Write snapshot, processed PG data pg_id:{} {}", pg_data->pg_id(), log_suffix);
return;
}

RELEASE_ASSERT(m_snp_rcv_handler,
"Snapshot receiver not initialized"); // Here we should have a valid snapshot receiver context
RELEASE_ASSERT(m_snp_rcv_handler->get_context_lsn() == context->get_lsn(), "Snapshot context lsn not matching");

if (obj_id.batch_id == 0) {
// Shard metadata message
RELEASE_ASSERT(obj_id.shard_seq_num != 0, "Invalid obj_id");
Expand All @@ -339,10 +342,11 @@ void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::sn
// Blob data message
// TODO: enhance error handling for wrong shard id - what may cause this?
RELEASE_ASSERT(obj_id.shard_seq_num ==
HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->shard_cursor_),
HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_shard_cursor()),
"Shard id not matching with the current shard cursor");
auto blob_batch = GetSizePrefixedResyncBlobDataBatch(data_buf);
auto ret = m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id);
auto ret =
m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id, blob_batch->is_last_batch());
if (ret) {
// Do not proceed, will request for resending the current blob batch
LOGE("Failed to process blob snapshot data lsn:{} obj_id {} shard {} batch {}, err {}", s->get_last_log_idx(),
Expand Down
120 changes: 73 additions & 47 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,18 @@
#include <homestore/blkdata_service.hpp>

namespace homeobject {
HSHomeObject::SnapshotReceiveHandler::SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_,
homestore::group_id_t group_id, int64_t lsn,
HSHomeObject::SnapshotReceiveHandler::SnapshotReceiveHandler(HSHomeObject& home_obj,
shared< homestore::ReplDev > repl_dev) :
snp_lsn_(lsn), home_obj_(home_obj), group_id_(group_id), pg_id_(pg_id_), repl_dev_(std::move(repl_dev)) {}
home_obj_(home_obj), repl_dev_(std::move(repl_dev)) {}

void HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaData const& pg_meta) {
LOGI("process_pg_snapshot_data pg_id:{}", pg_meta.pg_id());

// Init shard list
shard_list_.clear();
ctx_->shard_list.clear();
const auto ids = pg_meta.shard_ids();
for (unsigned int i = 0; i < ids->size(); i++) {
shard_list_.push_back(ids->Get(i));
ctx_->shard_list.push_back(ids->Get(i));
}

// Create local PG
Expand Down Expand Up @@ -46,29 +45,31 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
// Persist shard meta on chunk data
homestore::chunk_num_t chunk_id = shard_meta.vchunk_id(); // FIXME: vchunk id to chunk id

shard_info_superblk shard_sb;
shard_sb.info.id = shard_meta.shard_id();
shard_sb.info.placement_group = shard_meta.pg_id();
shard_sb.info.state = static_cast< ShardInfo::State >(shard_meta.state());
shard_sb.info.lsn = shard_meta.created_lsn();
shard_sb.info.created_time = shard_meta.created_time();
shard_sb.info.last_modified_time = shard_meta.last_modified_time();
shard_sb.info.available_capacity_bytes = shard_meta.total_capacity_bytes();
shard_sb.info.total_capacity_bytes = shard_meta.total_capacity_bytes();
shard_sb.info.deleted_capacity_bytes = 0;
shard_sb.chunk_id = chunk_id;
sisl::io_blob_safe aligned_buf(sisl::round_up(sizeof(shard_info_superblk), io_align), io_align);
shard_info_superblk* shard_sb = r_cast< shard_info_superblk* >(aligned_buf.bytes());
shard_sb->info.id = shard_meta.shard_id();
shard_sb->info.placement_group = shard_meta.pg_id();
shard_sb->info.state = static_cast< ShardInfo::State >(shard_meta.state());
shard_sb->info.lsn = shard_meta.created_lsn();
shard_sb->info.created_time = shard_meta.created_time();
shard_sb->info.last_modified_time = shard_meta.last_modified_time();
shard_sb->info.available_capacity_bytes = shard_meta.total_capacity_bytes();
shard_sb->info.total_capacity_bytes = shard_meta.total_capacity_bytes();
shard_sb->info.deleted_capacity_bytes = 0;
shard_sb->chunk_id = chunk_id;

homestore::MultiBlkId blk_id;
const auto hints = home_obj_.chunk_selector()->chunk_to_hints(chunk_id);
const auto hints = home_obj_.chunk_selector()->chunk_to_hints(shard_sb->chunk_id);
auto status = homestore::data_service().alloc_blks(
sisl::round_up(sizeof(shard_sb), homestore::data_service().get_blk_size()), hints, blk_id);
sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id);
if (status != homestore::BlkAllocStatus::SUCCESS) {
LOGE("Failed to allocate blocks for shard {}", shard_meta.shard_id());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure will we always baseline resync to a brand new member? if not, maybe we need to let emergency gc kick in here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prefer to clean up all the existing data in the PG at the start of a snapshot resync to reduce complexity. The cleanup logic will be implemented later.

return ALLOC_BLK_ERR;
}
shard_sb->chunk_id = blk_id.to_single_blkid().chunk_num(); // FIXME: remove this after intergating vchunk

const auto ret = homestore::data_service()
koujl marked this conversation as resolved.
Show resolved Hide resolved
.async_write(r_cast< char const* >(&shard_sb), sizeof(shard_sb), blk_id)
.async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id)
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
// TODO: do we need to update repl_dev metrics?
if (err) {
Expand All @@ -79,21 +80,29 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
return 0;
})
.get();
if (ret) {
if (ret.hasError()) {
LOGE("Failed to write shard info of shard_id {} to blk_id:{}", shard_meta.shard_id(), blk_id.to_string());
return WRITE_DATA_ERR;
}

// Now let's create local shard
home_obj_.local_create_shard(shard_sb.info, chunk_id, blk_id.blk_count());
shard_cursor_ = shard_meta.shard_id();
cur_batch_num_ = 0;
home_obj_.local_create_shard(shard_sb->info, shard_sb->chunk_id, blk_id.blk_count());
ctx_->shard_cursor = shard_meta.shard_id();
ctx_->cur_batch_num = 0;
return 0;
}

int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs,
const snp_batch_id_t batch_num) {
cur_batch_num_ = batch_num;
const snp_batch_id_t batch_num,
bool is_last_batch) {
ctx_->cur_batch_num = batch_num;

// Find chunk id for current shard
auto chunk_id = home_obj_.get_shard_chunk(ctx_->shard_cursor);
RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", ctx_->shard_cursor);
homestore::blk_alloc_hints hints;
hints.chunk_id_hint = *chunk_id;

for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) {
const auto blob = data_blobs.blob_list()->Get(i);

Expand All @@ -103,16 +112,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
continue;
}

// Check duplication to avoid reprocessing
shared< BlobIndexTable > index_table;
{
// Check duplication to avoid reprocessing. This may happen on resent blob batches.
if (!ctx_->index_table) {
std::shared_lock lock_guard(home_obj_._pg_lock);
auto iter = home_obj_._pg_map.find(pg_id_);
auto iter = home_obj_._pg_map.find(ctx_->pg_id);
koujl marked this conversation as resolved.
Show resolved Hide resolved
RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found");
index_table = dynamic_cast< HS_PG* >(iter->second.get())->index_table_;
ctx_->index_table = dynamic_cast< HS_PG* >(iter->second.get())->index_table_;
}
RELEASE_ASSERT(index_table != nullptr, "Index table instance null");
if (home_obj_.get_blob_from_index_table(index_table, shard_cursor_, blob->blob_id())) {
RELEASE_ASSERT(ctx_->index_table != nullptr, "Index table instance null");
if (home_obj_.get_blob_from_index_table(ctx_->index_table, ctx_->shard_cursor, blob->blob_id())) {
LOGD("Skip already persisted blob_id:{}", blob->blob_id());
continue;
}
Expand Down Expand Up @@ -142,17 +150,15 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
}

// Alloc & persist blob data
auto chunk_id = home_obj_.get_shard_chunk(shard_cursor_);
RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", shard_cursor_);
homestore::blk_alloc_hints hints;
hints.chunk_id_hint = *chunk_id;

auto data_size = blob->data()->size();
sisl::io_blob_safe aligned_buf(sisl::round_up(data_size, io_align), io_align);
std::memcpy(aligned_buf.bytes(), blob_data, data_size);

homestore::MultiBlkId blk_id;
auto status = homestore::data_service().alloc_blks(
sisl::round_up(data_size, homestore::data_service().get_blk_size()), hints, blk_id);
sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id);
if (status != homestore::BlkAllocStatus::SUCCESS) {
LOGE("Failed to allocate blocks for shard {} blob {}", shard_cursor_, blob->blob_id());
LOGE("Failed to allocate blocks for shard {} blob {}", ctx_->shard_cursor, blob->blob_id());
return ALLOC_BLK_ERR;
}

Expand All @@ -164,7 +170,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
};

const auto ret = homestore::data_service()
.async_write(r_cast< char const* >(blob_data), data_size, blk_id)
.async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id)
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
// TODO: do we need to update repl_dev metrics?
if (err) {
Expand All @@ -175,32 +181,52 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
return 0;
})
.get();
if (ret) {
if (ret.hasError()) {
LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string());
free_allocated_blks();
return WRITE_DATA_ERR;
}

// Add local blob info to index & PG
bool success = home_obj_.local_add_blob_info(pg_id_, BlobInfo{shard_cursor_, blob->blob_id(), {0, 0, 0}});
bool success =
home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob->blob_id(), blk_id});
if (!success) {
LOGE("Failed to add blob info for blob_id:{}", blob->blob_id());
free_allocated_blks();
return ADD_BLOB_INDEX_ERR;
}
}

if (is_last_batch) {
// Release chunk for sealed shard
ShardInfo::State state;
{
std::scoped_lock lock_guard(home_obj_._shard_lock);
auto iter = home_obj_._shard_map.find(ctx_->shard_cursor);
state = (*iter->second)->info.state;
}
if (state == ShardInfo::State::SEALED) { home_obj_.chunk_selector()->release_chunk(chunk_id.value()); }
}

return 0;
}

int64_t HSHomeObject::SnapshotReceiveHandler::get_context_lsn() const { return ctx_ ? ctx_->snp_lsn : -1; }

void HSHomeObject::SnapshotReceiveHandler::reset_context(int64_t lsn, pg_id_t pg_id) {
ctx_ = std::make_unique< SnapshotContext >(lsn, pg_id);
}

shard_id_t HSHomeObject::SnapshotReceiveHandler::get_shard_cursor() const { return ctx_->shard_cursor; }

shard_id_t HSHomeObject::SnapshotReceiveHandler::get_next_shard() const {
if (shard_list_.empty()) { return shard_list_end_marker; }
if (ctx_->shard_list.empty()) { return shard_list_end_marker; }

if (shard_cursor_ == 0) { return shard_list_[0]; }
if (ctx_->shard_cursor == 0) { return ctx_->shard_list[0]; }

for (size_t i = 0; i < shard_list_.size(); ++i) {
if (shard_list_[i] == shard_cursor_) {
return (i + 1 < shard_list_.size()) ? shard_list_[i + 1] : shard_list_end_marker;
for (size_t i = 0; i < ctx_->shard_list.size(); ++i) {
if (ctx_->shard_list[i] == ctx_->shard_cursor) {
return (i + 1 < ctx_->shard_list.size()) ? ctx_->shard_list[i + 1] : shard_list_end_marker;
}
}

Expand Down
Loading
Loading