Skip to content

Commit

Permalink
Implement ReplicationStateMachine::write_snapshot_data (#227)
Browse files Browse the repository at this point in the history
Implemented ReplicationStateMachine::write_snapshot_data() method and 
SnapshotReceiveHandler logic. Additionally, extracted local_create_pg, 
local_create_shard, and local_add_blob_info functions for follower data 
creation.
  • Loading branch information
koujl authored Nov 26, 2024
1 parent 7b9819d commit 303a0a0
Show file tree
Hide file tree
Showing 10 changed files with 467 additions and 145 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.7"
version = "2.1.8"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_shard_manager.cpp
hs_pg_manager.cpp
pg_blob_iterator.cpp
snapshot_receive_handler.cpp
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
Expand Down
100 changes: 52 additions & 48 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,48 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
});
}

bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) {
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
const auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
}
shared< BlobIndexTable > index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

// Write to index table with key {shard id, blob id} and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable
// counters updated as part of the update would have been persisted already in PG superblock. So if we were
// to increment now, it will be a duplicate increment, hence ignoring for cases where index already exist
// for this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_info](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_info.blob_id + 1;
while (next_blob_id > existing_blob_id &&
// we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can
// still get the up-to-date blob_sequence_num
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed);
});
}
return true;
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
homestore::MultiBlkId const& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand All @@ -189,54 +231,19 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
}

auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes())));
shared< BlobIndexTable > index_table;
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}

index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
auto const pg_id = msg_header->pg_id;

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.blob_id = blob_id;
blob_info.pbas = pbas;

// Write to index table with key {shard id, blob id } and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}",
msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR))); }
return;
} else {
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence if the index already has this entity, whatever durable counters
// updated as part of the update would have been persisted already in PG superblock. So if we were to
// increment now, it will be a duplicate increment, hence ignorning for cases where index already exist for
// this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_id, &pbas](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_id + 1;
while ((next_blob_id > existing_blob_id) &&
// we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can
// still get the up-to-date blob_sequence_num
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(pbas.blk_count(), std::memory_order_relaxed);
});
}
bool success = local_add_blob_info(pg_id, blob_info);

if (ctx) {
ctx->promise_.setValue(success ? BlobManager::Result< BlobInfo >(blob_info)
: folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR)));
}
if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
Expand Down Expand Up @@ -342,19 +349,16 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
auto chunk_id = get_shard_chunk(msg_header->shard_id);
if (!chunk_id.has_value()) {
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", hs_shard->sb_->chunk_id);
BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", *chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->chunk_id;
hints.chunk_id_hint = *chunk_id;
return hints;
}

Expand Down
49 changes: 35 additions & 14 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,33 @@ class HSHomeObject : public HomeObjectImpl {

// SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version.
struct SnapshotReceiveHandler {
SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id);
enum ErrorCode {
ALLOC_BLK_ERR = 1,
WRITE_DATA_ERR,
INVALID_BLOB_HEADER,
BLOB_DATA_CORRUPTED,
ADD_BLOB_INDEX_ERR,
};

constexpr static shard_id_t invalid_shard_id = 0;
constexpr static shard_id_t shard_list_end_marker = ULLONG_MAX;

SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id, int64_t lsn,
shared< homestore::ReplDev > repl_dev);
void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta);
void process_shard_snapshot_data(ResyncShardMetaData const& shard_meta, snp_obj_id_t& obj_id);
void process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blob, snp_obj_id_t& obj_id);
int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta);
int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num);
shard_id_t get_next_shard() const;

//snapshot start lsn
int64_t snp_lsn{0};
shard_id_t shard_cursor{0};
blob_id_t blob_cursor{0};
snp_batch_id_t cur_batch_num{0};
std::vector<shard_id_t> shard_list;
shard_id_t shard_cursor_{invalid_shard_id};
snp_batch_id_t cur_batch_num_{0};
std::vector< shard_id_t > shard_list_;

const int64_t snp_lsn_{0};
HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
const homestore::group_id_t group_id_;
const pg_id_t pg_id_;
const shared< homestore::ReplDev > repl_dev_;

//snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
Expand All @@ -357,17 +368,18 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::MultiBlkId& blkid) const;

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
HS_PG* local_create_pg(shared< homestore::ReplDev > repl_dev, PGInfo pg_info);
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t) const;

static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
static std::string serialize_shard_info(const ShardInfo& info);
void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t chunk_num, homestore::blk_count_t blk_count);
void add_new_shard_to_map(ShardPtr&& shard);
void update_shard_in_map(const ShardInfo& shard_info);

Expand Down Expand Up @@ -452,6 +464,14 @@ class HSHomeObject : public HomeObjectImpl {
*/
std::optional< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const;

/**
* @brief Get the sequence number of the shard from the shard id.
*
* @param shard_id The ID of the shard.
* @return The sequence number of the shard.
*/
static uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);

/**
* @brief recover PG and shard from the superblock.
*
Expand All @@ -473,6 +493,7 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const &blob_info);
homestore::ReplResult< homestore::blk_alloc_hints >
blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx);
void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size,
Expand Down
56 changes: 34 additions & 22 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <homestore/replication_service.hpp>
#include <utility>
#include "hs_homeobject.hpp"
#include "replication_state_machine.hpp"

Expand Down Expand Up @@ -93,6 +94,38 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe
});
}

HSHomeObject::HS_PG* HSHomeObject::local_create_pg(shared< ReplDev > repl_dev, PGInfo pg_info) {
auto pg_id = pg_info.id;
{
auto lg = shared_lock(_pg_lock);
if (auto it = _pg_map.find(pg_id); it != _pg_map.end()) {
LOGW("PG already exists, pg_id {}", pg_id);
return dynamic_cast< HS_PG* >(it->second.get());
}
}

// TODO: select chunks for this pg

// create index table and pg
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table);
auto ret = hs_pg.get();
{
scoped_lock lck(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
}
return ret;
}

void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -120,28 +153,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
}

auto pg_info = deserialize_pg_info(serailized_pg_info_buf, serailized_pg_info_size);
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) {
LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id);
if (ctx) { ctx->promise_.setValue(folly::Unit()); }
return;
}

// create index table and pg
// TODO create index table during create shard.
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table);
std::scoped_lock lock_guard(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
local_create_pg(std::move(repl_dev), pg_info);
if (ctx) ctx->promise_.setValue(folly::Unit());
}

Expand Down
56 changes: 30 additions & 26 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ shard_id_t HSHomeObject::generate_new_shard_id(pg_id_t pgid) {
return make_new_shard_id(pgid, new_sequence_num);
}

uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) const {
return shard_id_t & (max_shard_num_in_pg() - 1);
uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) {
return shard_id & (max_shard_num_in_pg() - 1);
}

std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) {
Expand Down Expand Up @@ -276,6 +276,33 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head
}
}

void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num_t chunk_num,
homestore::blk_count_t blk_count) {
bool shard_exist = false;
{
scoped_lock lock_guard(_shard_lock);
shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end());
}

if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, chunk_num));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
chunk_selector_->select_specific_chunk(chunk_num);
}

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update(
[blk_count](auto& de) { de.total_occupied_blk_count.fetch_add(blk_count, std::memory_order_relaxed); });
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -319,31 +346,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
auto shard_info = sb->info;
shard_info.lsn = lsn;

bool shard_exist = false;
{
std::scoped_lock lock_guard(_shard_lock);
shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end());
}

if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num()));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
chunk_selector_->select_specific_chunk(blkids.chunk_num());
}
local_create_shard(shard_info, blkids.chunk_num(), blkids.blk_count());
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
Expand Down
Loading

0 comments on commit 303a0a0

Please sign in to comment.