Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ReplicationStateMachine::write_snapshot_data #227

Merged
merged 5 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.7"
version = "2.1.8"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_shard_manager.cpp
hs_pg_manager.cpp
pg_blob_iterator.cpp
snapshot_receive_handler.cpp
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
Expand Down
100 changes: 52 additions & 48 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,48 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
});
}

bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) {
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
const auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = dynamic_cast< HS_PG* >(iter->second.get());
}
shared< BlobIndexTable > index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not initialized");

// Write to index table with key {shard id, blob id} and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}",
blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status));
return false;
}
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable
// counters updated as part of the update would have been persisted already in PG superblock. So if we were
// to increment now, it will be a duplicate increment, hence ignoring for cases where index already exist
// for this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_info](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_info.blob_id + 1;
while (next_blob_id > existing_blob_id &&
// we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can
// still get the up-to-date blob_sequence_num
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed);
});
}
return true;
}

void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
homestore::MultiBlkId const& pbas,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand All @@ -189,54 +231,19 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
}

auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes())));
shared< BlobIndexTable > index_table;
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}

index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
auto const pg_id = msg_header->pg_id;

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.blob_id = blob_id;
blob_info.pbas = pbas;

// Write to index table with key {shard id, blob id } and value {pba}.
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}",
msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string());
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR))); }
return;
} else {
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence if the index already has this entity, whatever durable counters
// updated as part of the update would have been persisted already in PG superblock. So if we were to
// increment now, it will be a duplicate increment, hence ignorning for cases where index already exist for
// this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_id, &pbas](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
auto next_blob_id = blob_id + 1;
while ((next_blob_id > existing_blob_id) &&
// we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can
// still get the up-to-date blob_sequence_num
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
de.total_occupied_blk_count.fetch_add(pbas.blk_count(), std::memory_order_relaxed);
});
}
bool success = local_add_blob_info(pg_id, blob_info);

if (ctx) {
ctx->promise_.setValue(success ? BlobManager::Result< BlobInfo >(blob_info)
: folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR)));
}
if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
Expand Down Expand Up @@ -342,19 +349,16 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
if (shard_iter == _shard_map.end()) {
auto chunk_id = get_shard_chunk(msg_header->shard_id);
if (!chunk_id.has_value()) {
LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later",
msg_header->shard_id);
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", hs_shard->sb_->chunk_id);
BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", *chunk_id);

homestore::blk_alloc_hints hints;
hints.chunk_id_hint = hs_shard->sb_->chunk_id;
hints.chunk_id_hint = *chunk_id;
return hints;
}

Expand Down
49 changes: 35 additions & 14 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,33 @@ class HSHomeObject : public HomeObjectImpl {

// SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version.
struct SnapshotReceiveHandler {
SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id);
enum ErrorCode {
ALLOC_BLK_ERR = 1,
WRITE_DATA_ERR,
INVALID_BLOB_HEADER,
BLOB_DATA_CORRUPTED,
ADD_BLOB_INDEX_ERR,
};

constexpr static shard_id_t invalid_shard_id = 0;
constexpr static shard_id_t shard_list_end_marker = ULLONG_MAX;

SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id, int64_t lsn,
shared< homestore::ReplDev > repl_dev);
void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta);
void process_shard_snapshot_data(ResyncShardMetaData const& shard_meta, snp_obj_id_t& obj_id);
void process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blob, snp_obj_id_t& obj_id);
int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta);
int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num);
shard_id_t get_next_shard() const;

//snapshot start lsn
int64_t snp_lsn{0};
shard_id_t shard_cursor{0};
blob_id_t blob_cursor{0};
snp_batch_id_t cur_batch_num{0};
std::vector<shard_id_t> shard_list;
shard_id_t shard_cursor_{invalid_shard_id};
snp_batch_id_t cur_batch_num_{0};
std::vector< shard_id_t > shard_list_;

const int64_t snp_lsn_{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see this is being used expect for the check to reuse this handler instance. Can we remove this field and the check and always reuse this handler instance?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The follower may receive a new snapshot transmission request if the leader crashes or if the previous snapshot transmission times out. The current LSN is necessary for the follower to recognize such cases and discard the current handler instance.

HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
const homestore::group_id_t group_id_;
const pg_id_t pg_id_;
const shared< homestore::ReplDev > repl_dev_;

//snapshot info, can be used as a checkpoint for recovery
snapshot_info_superblk snp_info_;
Expand All @@ -357,17 +368,18 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::MultiBlkId& blkid) const;

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
HS_PG* local_create_pg(shared< homestore::ReplDev > repl_dev, PGInfo pg_info);
static std::string serialize_pg_info(const PGInfo& info);
static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size);
void add_pg_to_map(unique< HS_PG > hs_pg);

// create shard related
shard_id_t generate_new_shard_id(pg_id_t pg);
uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t) const;

static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
static std::string serialize_shard_info(const ShardInfo& info);
void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t chunk_num, homestore::blk_count_t blk_count);
void add_new_shard_to_map(ShardPtr&& shard);
void update_shard_in_map(const ShardInfo& shard_info);

Expand Down Expand Up @@ -452,6 +464,14 @@ class HSHomeObject : public HomeObjectImpl {
*/
std::optional< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const;

/**
* @brief Get the sequence number of the shard from the shard id.
*
* @param shard_id The ID of the shard.
* @return The sequence number of the shard.
*/
static uint64_t get_sequence_num_from_shard_id(uint64_t shard_id);

/**
* @brief recover PG and shard from the superblock.
*
Expand All @@ -473,6 +493,7 @@ class HSHomeObject : public HomeObjectImpl {
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const &blob_info);
homestore::ReplResult< homestore::blk_alloc_hints >
blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx);
void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size,
Expand Down
56 changes: 34 additions & 22 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/string_generator.hpp>
#include <homestore/replication_service.hpp>
#include <utility>
#include "hs_homeobject.hpp"
#include "replication_state_machine.hpp"

Expand Down Expand Up @@ -93,6 +94,38 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe
});
}

HSHomeObject::HS_PG* HSHomeObject::local_create_pg(shared< ReplDev > repl_dev, PGInfo pg_info) {
koujl marked this conversation as resolved.
Show resolved Hide resolved
auto pg_id = pg_info.id;
{
auto lg = shared_lock(_pg_lock);
if (auto it = _pg_map.find(pg_id); it != _pg_map.end()) {
LOGW("PG already exists, pg_id {}", pg_id);
return dynamic_cast< HS_PG* >(it->second.get());
}
}

// TODO: select chunks for this pg

// create index table and pg
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table);
auto ret = hs_pg.get();
{
scoped_lock lck(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
}
return ret;
}

void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -120,28 +153,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he
}

auto pg_info = deserialize_pg_info(serailized_pg_info_buf, serailized_pg_info_size);
auto pg_id = pg_info.id;
if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) {
LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id);
if (ctx) { ctx->promise_.setValue(folly::Unit()); }
return;
}

// create index table and pg
// TODO create index table during create shard.
auto index_table = create_index_table();
auto uuid_str = boost::uuids::to_string(index_table->uuid());

auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table);
std::scoped_lock lock_guard(index_lock_);
RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found");
index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table};

LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str);
hs_pg->index_table_ = index_table;
// Add to index service, so that it gets cleaned up when index service is shutdown.
homestore::hs()->index_service().add_index_table(index_table);
add_pg_to_map(std::move(hs_pg));
local_create_pg(std::move(repl_dev), pg_info);
if (ctx) ctx->promise_.setValue(folly::Unit());
}

Expand Down
56 changes: 30 additions & 26 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ shard_id_t HSHomeObject::generate_new_shard_id(pg_id_t pgid) {
return make_new_shard_id(pgid, new_sequence_num);
}

uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) const {
return shard_id_t & (max_shard_num_in_pg() - 1);
uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) {
koujl marked this conversation as resolved.
Show resolved Hide resolved
return shard_id & (max_shard_num_in_pg() - 1);
}

std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) {
Expand Down Expand Up @@ -276,6 +276,33 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head
}
}

void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num_t chunk_num,
homestore::blk_count_t blk_count) {
bool shard_exist = false;
{
scoped_lock lock_guard(_shard_lock);
shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end());
}

if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, chunk_num));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
chunk_selector_->select_specific_chunk(chunk_num);
}

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update(
[blk_count](auto& de) { de.total_occupied_blk_count.fetch_add(blk_count, std::memory_order_relaxed); });
}

void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids,
shared< homestore::ReplDev > repl_dev,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
Expand Down Expand Up @@ -319,31 +346,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
auto shard_info = sb->info;
shard_info.lsn = lsn;

bool shard_exist = false;
{
std::scoped_lock lock_guard(_shard_lock);
shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end());
}

if (!shard_exist) {
add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num()));
// select_specific_chunk() will do something only when we are relaying journal after restart, during the
// runtime flow chunk is already been be mark busy when we write the shard info to the repldev.
chunk_selector_->select_specific_chunk(blkids.chunk_num());
}
local_create_shard(shard_info, blkids.chunk_num(), blkids.blk_count());
if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); }

// update pg's total_occupied_blk_count
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(shard_info.placement_group);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
hs_pg = static_cast< HS_PG* >(iter->second.get());
}
hs_pg->durable_entities_update([&blkids](auto& de) {
de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed);
});
LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id);

break;
Expand Down
Loading
Loading