diff --git a/conanfile.py b/conanfile.py index 03f1966f..37e9ed27 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomeObjectConan(ConanFile): name = "homeobject" - version = "2.1.7" + version = "2.1.8" homepage = "https://github.com/eBay/HomeObject" description = "Blob Store built on HomeReplication" diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 0ac93cf0..5f6e97ae 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -23,6 +23,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE hs_shard_manager.cpp hs_pg_manager.cpp pg_blob_iterator.cpp + snapshot_receive_handler.cpp index_kv.cpp heap_chunk_selector.cpp replication_state_machine.cpp diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index c0aa81e5..c7c521f3 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -172,6 +172,48 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s }); } +bool HSHomeObject::local_add_blob_info(pg_id_t const pg_id, BlobInfo const& blob_info) { + HS_PG* hs_pg{nullptr}; + { + shared_lock lock_guard(_pg_lock); + const auto iter = _pg_map.find(pg_id); + RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); + hs_pg = dynamic_cast< HS_PG* >(iter->second.get()); + } + shared< BlobIndexTable > index_table = hs_pg->index_table_; + RELEASE_ASSERT(index_table != nullptr, "Index table not initialized"); + + // Write to index table with key {shard id, blob id} and value {pba}. + auto const [exist_already, status] = add_to_index_table(index_table, blob_info); + LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, exist_already:{}, status:{}, pbas: {}", + blob_info.shard_id, blob_info.blob_id, exist_already, status, blob_info.pbas.to_string()); + if (!exist_already) { + if (status != homestore::btree_status_t::success) { + LOGE("Failed to insert into index table for blob {} err {}", blob_info.blob_id, enum_name(status)); + return false; + } + // The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always + // done ahead of the Index Checkpoint. Hence, if the index already has this entity, whatever durable + // counters updated as part of the update would have been persisted already in PG superblock. So if we were + // to increment now, it will be a duplicate increment, hence ignoring for cases where index already exist + // for this blob put. + + // Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the + // number is already updated in the put_blob call. + hs_pg->durable_entities_update([&blob_info](auto& de) { + auto existing_blob_id = de.blob_sequence_num.load(); + auto next_blob_id = blob_info.blob_id + 1; + while (next_blob_id > existing_blob_id && + // we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can + // still get the up-to-date blob_sequence_num + !de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {} + de.active_blob_count.fetch_add(1, std::memory_order_relaxed); + de.total_occupied_blk_count.fetch_add(blob_info.pbas.blk_count(), std::memory_order_relaxed); + }); + } + return true; +} + void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, homestore::MultiBlkId const& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx) { @@ -189,54 +231,19 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis } auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes()))); - shared< BlobIndexTable > index_table; - HS_PG* hs_pg{nullptr}; - { - std::shared_lock lock_guard(_pg_lock); - auto iter = _pg_map.find(msg_header->pg_id); - RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); - hs_pg = static_cast< HS_PG* >(iter->second.get()); - } - - index_table = hs_pg->index_table_; - RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); + auto const pg_id = msg_header->pg_id; BlobInfo blob_info; blob_info.shard_id = msg_header->shard_id; blob_info.blob_id = blob_id; blob_info.pbas = pbas; - // Write to index table with key {shard id, blob id } and value {pba}. - auto const [exist_already, status] = add_to_index_table(index_table, blob_info); - LOGTRACEMOD(blobmgr, "blob put commit shard_id: {} blob_id: {}, lsn:{}, exist_already:{}, status:{}, pbas: {}", - msg_header->shard_id, blob_id, lsn, exist_already, status, pbas.to_string()); - if (!exist_already) { - if (status != homestore::btree_status_t::success) { - LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status)); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR))); } - return; - } else { - // The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always - // done ahead of the Index Checkpoint. Hence if the index already has this entity, whatever durable counters - // updated as part of the update would have been persisted already in PG superblock. So if we were to - // increment now, it will be a duplicate increment, hence ignorning for cases where index already exist for - // this blob put. - - // Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the - // number is already updated in the put_blob call. - hs_pg->durable_entities_update([&blob_id, &pbas](auto& de) { - auto existing_blob_id = de.blob_sequence_num.load(); - auto next_blob_id = blob_id + 1; - while ((next_blob_id > existing_blob_id) && - // we need update the blob_sequence_num to existing_blob_id+1 so that if leader changes, we can - // still get the up-to-date blob_sequence_num - !de.blob_sequence_num.compare_exchange_weak(existing_blob_id, next_blob_id)) {} - de.active_blob_count.fetch_add(1, std::memory_order_relaxed); - de.total_occupied_blk_count.fetch_add(pbas.blk_count(), std::memory_order_relaxed); - }); - } + bool success = local_add_blob_info(pg_id, blob_info); + + if (ctx) { + ctx->promise_.setValue(success ? BlobManager::Result< BlobInfo >(blob_info) + : folly::makeUnexpected(BlobError(BlobErrorCode::INDEX_ERROR))); } - if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } } BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset, @@ -342,19 +349,16 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< return folly::makeUnexpected(homestore::ReplServiceError::FAILED); } - std::scoped_lock lock_guard(_shard_lock); - auto shard_iter = _shard_map.find(msg_header->shard_id); - if (shard_iter == _shard_map.end()) { + auto chunk_id = get_shard_chunk(msg_header->shard_id); + if (!chunk_id.has_value()) { LOGW("Received a blob_put on an unknown shard:{}, underlying engine will retry this later", msg_header->shard_id); return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); } - - auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get()); - BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", hs_shard->sb_->chunk_id); + BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", *chunk_id); homestore::blk_alloc_hints hints; - hints.chunk_id_hint = hs_shard->sb_->chunk_id; + hints.chunk_id_hint = *chunk_id; return hints; } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index 6101ec48..507b4f0f 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -318,22 +318,33 @@ class HSHomeObject : public HomeObjectImpl { // SnapshotReceiverContext is the context used in follower side snapshot receiving. [drafting] The functions is not the final version. struct SnapshotReceiveHandler { - SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id); + enum ErrorCode { + ALLOC_BLK_ERR = 1, + WRITE_DATA_ERR, + INVALID_BLOB_HEADER, + BLOB_DATA_CORRUPTED, + ADD_BLOB_INDEX_ERR, + }; + + constexpr static shard_id_t invalid_shard_id = 0; + constexpr static shard_id_t shard_list_end_marker = ULLONG_MAX; + + SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, homestore::group_id_t group_id, int64_t lsn, + shared< homestore::ReplDev > repl_dev); void process_pg_snapshot_data(ResyncPGMetaData const& pg_meta); - void process_shard_snapshot_data(ResyncShardMetaData const& shard_meta, snp_obj_id_t& obj_id); - void process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blob, snp_obj_id_t& obj_id); + int process_shard_snapshot_data(ResyncShardMetaData const& shard_meta); + int process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, snp_batch_id_t batch_num); + shard_id_t get_next_shard() const; - //snapshot start lsn - int64_t snp_lsn{0}; - shard_id_t shard_cursor{0}; - blob_id_t blob_cursor{0}; - snp_batch_id_t cur_batch_num{0}; - std::vector shard_list; + shard_id_t shard_cursor_{invalid_shard_id}; + snp_batch_id_t cur_batch_num_{0}; + std::vector< shard_id_t > shard_list_; + const int64_t snp_lsn_{0}; HSHomeObject& home_obj_; - homestore::group_id_t group_id_; - pg_id_t pg_id_; - shared< homestore::ReplDev > repl_dev_; + const homestore::group_id_t group_id_; + const pg_id_t pg_id_; + const shared< homestore::ReplDev > repl_dev_; //snapshot info, can be used as a checkpoint for recovery snapshot_info_superblk snp_info_; @@ -357,17 +368,18 @@ class HSHomeObject : public HomeObjectImpl { const homestore::MultiBlkId& blkid) const; // create pg related - PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); + static PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info); + HS_PG* local_create_pg(shared< homestore::ReplDev > repl_dev, PGInfo pg_info); static std::string serialize_pg_info(const PGInfo& info); static PGInfo deserialize_pg_info(const unsigned char* pg_info_str, size_t size); void add_pg_to_map(unique< HS_PG > hs_pg); // create shard related shard_id_t generate_new_shard_id(pg_id_t pg); - uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t) const; static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size); static std::string serialize_shard_info(const ShardInfo& info); + void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t chunk_num, homestore::blk_count_t blk_count); void add_new_shard_to_map(ShardPtr&& shard); void update_shard_in_map(const ShardInfo& shard_info); @@ -452,6 +464,14 @@ class HSHomeObject : public HomeObjectImpl { */ std::optional< homestore::chunk_num_t > get_shard_chunk(shard_id_t id) const; + /** + * @brief Get the sequence number of the shard from the shard id. + * + * @param shard_id The ID of the shard. + * @return The sequence number of the shard. + */ + static uint64_t get_sequence_num_from_shard_id(uint64_t shard_id); + /** * @brief recover PG and shard from the superblock. * @@ -473,6 +493,7 @@ class HSHomeObject : public HomeObjectImpl { const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx); void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); + bool local_add_blob_info(pg_id_t pg_id, BlobInfo const &blob_info); homestore::ReplResult< homestore::blk_alloc_hints > blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx); void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size, diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 5a90bb93..b70842af 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include "hs_homeobject.hpp" #include "replication_state_machine.hpp" @@ -93,6 +94,38 @@ PGManager::NullAsyncResult HSHomeObject::do_create_pg(cshared< homestore::ReplDe }); } +HSHomeObject::HS_PG* HSHomeObject::local_create_pg(shared< ReplDev > repl_dev, PGInfo pg_info) { + auto pg_id = pg_info.id; + { + auto lg = shared_lock(_pg_lock); + if (auto it = _pg_map.find(pg_id); it != _pg_map.end()) { + LOGW("PG already exists, pg_id {}", pg_id); + return dynamic_cast< HS_PG* >(it->second.get()); + } + } + + // TODO: select chunks for this pg + + // create index table and pg + auto index_table = create_index_table(); + auto uuid_str = boost::uuids::to_string(index_table->uuid()); + + auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table); + auto ret = hs_pg.get(); + { + scoped_lock lck(index_lock_); + RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); + index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; + + LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str); + hs_pg->index_table_ = index_table; + // Add to index service, so that it gets cleaned up when index service is shutdown. + hs()->index_service().add_index_table(index_table); + add_pg_to_map(std::move(hs_pg)); + } + return ret; +} + void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& header, shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { @@ -120,28 +153,7 @@ void HSHomeObject::on_create_pg_message_commit(int64_t lsn, sisl::blob const& he } auto pg_info = deserialize_pg_info(serailized_pg_info_buf, serailized_pg_info_size); - auto pg_id = pg_info.id; - if (auto lg = std::shared_lock(_pg_lock); _pg_map.end() != _pg_map.find(pg_id)) { - LOGW("PG already exists, lsn:{}, pg_id {}", lsn, pg_id); - if (ctx) { ctx->promise_.setValue(folly::Unit()); } - return; - } - - // create index table and pg - // TODO create index table during create shard. - auto index_table = create_index_table(); - auto uuid_str = boost::uuids::to_string(index_table->uuid()); - - auto hs_pg = std::make_unique< HS_PG >(std::move(pg_info), std::move(repl_dev), index_table); - std::scoped_lock lock_guard(index_lock_); - RELEASE_ASSERT(index_table_pg_map_.count(uuid_str) == 0, "duplicate index table found"); - index_table_pg_map_[uuid_str] = PgIndexTable{pg_id, index_table}; - - LOGI("Index table created for pg {} uuid {}", pg_id, uuid_str); - hs_pg->index_table_ = index_table; - // Add to index service, so that it gets cleaned up when index service is shutdown. - homestore::hs()->index_service().add_index_table(index_table); - add_pg_to_map(std::move(hs_pg)); + local_create_pg(std::move(repl_dev), pg_info); if (ctx) ctx->promise_.setValue(folly::Unit()); } diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 8378087b..37e4ab5d 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -58,8 +58,8 @@ shard_id_t HSHomeObject::generate_new_shard_id(pg_id_t pgid) { return make_new_shard_id(pgid, new_sequence_num); } -uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) const { - return shard_id_t & (max_shard_num_in_pg() - 1); +uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id) { + return shard_id & (max_shard_num_in_pg() - 1); } std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) { @@ -276,6 +276,33 @@ void HSHomeObject::on_shard_message_rollback(int64_t lsn, sisl::blob const& head } } +void HSHomeObject::local_create_shard(ShardInfo shard_info, homestore::chunk_num_t chunk_num, + homestore::blk_count_t blk_count) { + bool shard_exist = false; + { + scoped_lock lock_guard(_shard_lock); + shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end()); + } + + if (!shard_exist) { + add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, chunk_num)); + // select_specific_chunk() will do something only when we are relaying journal after restart, during the + // runtime flow chunk is already been be mark busy when we write the shard info to the repldev. + chunk_selector_->select_specific_chunk(chunk_num); + } + + // update pg's total_occupied_blk_count + HS_PG* hs_pg{nullptr}; + { + shared_lock lock_guard(_pg_lock); + auto iter = _pg_map.find(shard_info.placement_group); + RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); + hs_pg = static_cast< HS_PG* >(iter->second.get()); + } + hs_pg->durable_entities_update( + [blk_count](auto& de) { de.total_occupied_blk_count.fetch_add(blk_count, std::memory_order_relaxed); }); +} + void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, homestore::MultiBlkId const& blkids, shared< homestore::ReplDev > repl_dev, cintrusive< homestore::repl_req_ctx >& hs_ctx) { @@ -319,31 +346,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom auto shard_info = sb->info; shard_info.lsn = lsn; - bool shard_exist = false; - { - std::scoped_lock lock_guard(_shard_lock); - shard_exist = (_shard_map.find(shard_info.id) != _shard_map.end()); - } - - if (!shard_exist) { - add_new_shard_to_map(std::make_unique< HS_Shard >(shard_info, blkids.chunk_num())); - // select_specific_chunk() will do something only when we are relaying journal after restart, during the - // runtime flow chunk is already been be mark busy when we write the shard info to the repldev. - chunk_selector_->select_specific_chunk(blkids.chunk_num()); - } + local_create_shard(shard_info, blkids.chunk_num(), blkids.blk_count()); if (ctx) { ctx->promise_.setValue(ShardManager::Result< ShardInfo >(shard_info)); } - - // update pg's total_occupied_blk_count - HS_PG* hs_pg{nullptr}; - { - std::shared_lock lock_guard(_pg_lock); - auto iter = _pg_map.find(shard_info.placement_group); - RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); - hs_pg = static_cast< HS_PG* >(iter->second.get()); - } - hs_pg->durable_entities_update([&blkids](auto& de) { - de.total_occupied_blk_count.fetch_add(blkids.blk_count(), std::memory_order_relaxed); - }); LOGI("Commit done for CREATE_SHARD_MSG for shard {}", shard_info.id); break; diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 0efbf63d..96625b09 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -154,7 +154,7 @@ void ReplicationStateMachine::on_replace_member(const homestore::replica_member_ home_object_->on_pg_replace_member(repl_dev()->group_id(), member_out, member_in); } -void ReplicationStateMachine::on_destroy() { +void ReplicationStateMachine::on_destroy(const homestore::group_id_t& group_id) { // TODO:: add the logic to handle destroy LOGI("replica destroyed"); } @@ -266,38 +266,103 @@ int ReplicationStateMachine::read_snapshot_data(std::shared_ptr< homestore::snap void ReplicationStateMachine::write_snapshot_data(std::shared_ptr< homestore::snapshot_context > context, std::shared_ptr< homestore::snapshot_data > snp_data) { - // RELEASE_ASSERT(context != nullptr, "Context null"); - // RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null"); - // auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); - // int64_t obj_id = snp_data->offset; - // uint64_t shard_seq_num = obj_id >> 16; - // uint64_t batch_number = obj_id & 0xFFFF; - // - // auto log_str = fmt::format("group={} term={} lsn={} shard_seq={} batch_num={} size={}", - // boost::uuids::to_string(repl_dev()->group_id()), s->get_last_log_term(), - // s->get_last_log_idx(), shard_seq_num, batch_number, snp_data->blob.size()); - // - // if (snp_data->is_last_obj) { - // LOGD("Write snapshot reached is_last_obj true {}", log_str); - // return; - // } - // - // if (obj_id == 0) { - // snp_data->offset = 1 << 16; - // // TODO add metadata. - // return; - // } - // - // auto snp = GetSizePrefixedResyncBlobDataBatch(snp_data->blob.bytes()); - // // TODO Add blob puts - - // if (snp->end_of_batch()) { - // snp_data->offset = (shard_seq_num + 1) << 16; - // } else { - // snp_data->offset = (shard_seq_num << 16) | (batch_number + 1); - // } - // - // LOGT("Read snapshot num_blobs={} end_of_batch={} {}", snp->data_array()->size(), snp->end_of_batch(), log_str); + RELEASE_ASSERT(context != nullptr, "Context null"); + RELEASE_ASSERT(snp_data != nullptr, "Snapshot data null"); + auto s = dynamic_pointer_cast< homestore::nuraft_snapshot_context >(context)->nuraft_snapshot(); + auto obj_id = objId(static_cast< snp_obj_id_t >(snp_data->offset)); + auto r_dev = repl_dev(); + + auto log_suffix = fmt::format("group={} term={} lsn={} shard={} batch_num={} size={}", + uuids::to_string(r_dev->group_id()), s->get_last_log_term(), s->get_last_log_idx(), + obj_id.shard_seq_num, obj_id.batch_id, snp_data->blob.size()); + + if (snp_data->is_last_obj) { + LOGD("Write snapshot reached is_last_obj true {}", log_suffix); + return; + } + + // Check if the snapshot context is same as the current snapshot context. + // If not, drop the previous context and re-init a new one + if (m_snp_rcv_handler && m_snp_rcv_handler->snp_lsn_ != context->get_lsn()) { m_snp_rcv_handler.reset(nullptr); } + + // Check message integrity + // TODO: add a flip here to simulate corrupted message + auto header = r_cast< const SyncMessageHeader* >(snp_data->blob.cbytes()); + if (header->corrupted()) { + LOGE("corrupted message in write_snapshot_data, lsn:{}, obj_id {} shard {} batch {}", s->get_last_log_idx(), + obj_id.value, obj_id.shard_seq_num, obj_id.batch_id); + return; + } + if (auto payload_size = snp_data->blob.size() - sizeof(SyncMessageHeader); payload_size != header->payload_size) { + LOGE("payload size mismatch in write_snapshot_data {} != {}, lsn:{}, obj_id {} shard {} batch {}", payload_size, + header->payload_size, s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id); + return; + } + auto data_buf = snp_data->blob.cbytes() + sizeof(SyncMessageHeader); + + if (obj_id.shard_seq_num == 0) { + // PG metadata & shard list message + RELEASE_ASSERT(obj_id.batch_id == 0, "Invalid obj_id"); + + // TODO: Reset all data of current PG - let's resync on a pristine base + + auto pg_data = GetSizePrefixedResyncPGMetaData(data_buf); + m_snp_rcv_handler = std::make_unique< HSHomeObject::SnapshotReceiveHandler >( + *home_object_, pg_data->pg_id(), r_dev->group_id(), context->get_lsn(), r_dev); + m_snp_rcv_handler->process_pg_snapshot_data(*pg_data); + snp_data->offset = + objId(HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->get_next_shard()), 0).value; + LOGD("Write snapshot, processed PG data pg_id:{} {}", pg_data->pg_id(), log_suffix); + return; + } + + RELEASE_ASSERT(m_snp_rcv_handler, + "Snapshot receiver not initialized"); // Here we should have a valid snapshot receiver context + if (obj_id.batch_id == 0) { + // Shard metadata message + RELEASE_ASSERT(obj_id.shard_seq_num != 0, "Invalid obj_id"); + + auto shard_data = GetSizePrefixedResyncShardMetaData(data_buf); + auto ret = m_snp_rcv_handler->process_shard_snapshot_data(*shard_data); + if (ret) { + // Do not proceed, will request for resending the shard data + LOGE("Failed to process shard snapshot data lsn:{} obj_id {} shard {} batch {}, err {}", + s->get_last_log_idx(), obj_id.value, obj_id.shard_seq_num, obj_id.batch_id, ret); + } else { + // Request for the next batch + snp_data->offset = objId(obj_id.shard_seq_num, 1).value; + } + LOGD("Write snapshot, processed shard data shard_seq_num:{} {}", obj_id.shard_seq_num, log_suffix); + return; + } + + // Blob data message + // TODO: enhance error handling for wrong shard id - what may cause this? + RELEASE_ASSERT(obj_id.shard_seq_num == + HSHomeObject::get_sequence_num_from_shard_id(m_snp_rcv_handler->shard_cursor_), + "Shard id not matching with the current shard cursor"); + auto blob_batch = GetSizePrefixedResyncBlobDataBatch(data_buf); + auto ret = m_snp_rcv_handler->process_blobs_snapshot_data(*blob_batch, obj_id.batch_id); + if (ret) { + // Do not proceed, will request for resending the current blob batch + LOGE("Failed to process blob snapshot data lsn:{} obj_id {} shard {} batch {}, err {}", s->get_last_log_idx(), + obj_id.value, obj_id.shard_seq_num, obj_id.batch_id, ret); + return; + } + // Set next obj_id to fetch + if (blob_batch->is_last_batch()) { + auto next_shard = m_snp_rcv_handler->get_next_shard(); + if (next_shard == HSHomeObject::SnapshotReceiveHandler::shard_list_end_marker) { + snp_data->offset = LAST_OBJ_ID; + } else { + snp_data->offset = objId(HSHomeObject::get_sequence_num_from_shard_id(next_shard), 0).value; + } + } else { + snp_data->offset = objId(obj_id.shard_seq_num, obj_id.batch_id + 1).value; + } + + LOGD("Write snapshot, processed blob data shard_seq_num:{} batch_num:{} {}", obj_id.shard_seq_num, obj_id.batch_id, + log_suffix); } void ReplicationStateMachine::free_user_snp_ctx(void*& user_snp_ctx) { diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 25556e29..6fa861d8 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -173,7 +173,7 @@ class ReplicationStateMachine : public homestore::ReplDevListener { const homestore::replica_member_info& member_in) override; /// @brief Called when the replica is being destroyed by nuraft; - void on_destroy() override; + void on_destroy(const homestore::group_id_t& group_id) override; /// Not Implemented /// @brief Called when the snapshot is being created by nuraft; @@ -188,8 +188,11 @@ class ReplicationStateMachine : public homestore::ReplDevListener { private: HSHomeObject* home_object_{nullptr}; + std::shared_ptr< homestore::snapshot_context > m_snapshot_context; std::mutex m_snapshot_lock; + + std::unique_ptr m_snp_rcv_handler; }; } // namespace homeobject diff --git a/src/lib/homestore_backend/snapshot_receive_handler.cpp b/src/lib/homestore_backend/snapshot_receive_handler.cpp new file mode 100644 index 00000000..840109b3 --- /dev/null +++ b/src/lib/homestore_backend/snapshot_receive_handler.cpp @@ -0,0 +1,210 @@ +#include + +#include "hs_homeobject.hpp" + +#include + +namespace homeobject { +HSHomeObject::SnapshotReceiveHandler::SnapshotReceiveHandler(HSHomeObject& home_obj, pg_id_t pg_id_, + homestore::group_id_t group_id, int64_t lsn, + shared< homestore::ReplDev > repl_dev) : + snp_lsn_(lsn), home_obj_(home_obj), group_id_(group_id), pg_id_(pg_id_), repl_dev_(std::move(repl_dev)) {} + +void HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaData const& pg_meta) { + LOGI("process_pg_snapshot_data pg_id:{}", pg_meta.pg_id()); + + // Init shard list + shard_list_.clear(); + const auto ids = pg_meta.shard_ids(); + for (unsigned int i = 0; i < ids->size(); i++) { + shard_list_.push_back(ids->Get(i)); + } + + // Create local PG + PGInfo pg_info(pg_meta.pg_id()); + std::copy_n(pg_meta.replica_set_uuid()->data(), 16, pg_info.replica_set_uuid.begin()); + for (unsigned int i = 0; i < pg_meta.members()->size(); i++) { + const auto member = pg_meta.members()->Get(i); + uuids::uuid id{}; + std::copy_n(member->uuid()->data(), 16, id.begin()); + PGMember pg_member(id); + pg_member.name = GetString(member->name()); + pg_member.priority = member->priority(); + pg_info.members.insert(pg_member); + } + const auto hs_pg = home_obj_.local_create_pg(repl_dev_, pg_info); + + // Init a base set of pg blob & shard sequence num. Will catch up later on shard/blob creation if not up-to-date + hs_pg->shard_sequence_num_ = pg_meta.shard_seq_num(); + hs_pg->durable_entities_update( + [&pg_meta](auto& de) { de.blob_sequence_num.store(pg_meta.blob_seq_num(), std::memory_order_relaxed); }); +} + +int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShardMetaData const& shard_meta) { + LOGI("process_shard_snapshot_data shard_id:{}", shard_meta.shard_id()); + + // Persist shard meta on chunk data + homestore::chunk_num_t chunk_id = shard_meta.vchunk_id(); // FIXME: vchunk id to chunk id + + shard_info_superblk shard_sb; + shard_sb.info.id = shard_meta.shard_id(); + shard_sb.info.placement_group = shard_meta.pg_id(); + shard_sb.info.state = static_cast< ShardInfo::State >(shard_meta.state()); + shard_sb.info.lsn = shard_meta.created_lsn(); + shard_sb.info.created_time = shard_meta.created_time(); + shard_sb.info.last_modified_time = shard_meta.last_modified_time(); + shard_sb.info.available_capacity_bytes = shard_meta.total_capacity_bytes(); + shard_sb.info.total_capacity_bytes = shard_meta.total_capacity_bytes(); + shard_sb.info.deleted_capacity_bytes = 0; + shard_sb.chunk_id = chunk_id; + + homestore::MultiBlkId blk_id; + const auto hints = home_obj_.chunk_selector()->chunk_to_hints(chunk_id); + auto status = homestore::data_service().alloc_blks( + sisl::round_up(sizeof(shard_sb), homestore::data_service().get_blk_size()), hints, blk_id); + if (status != homestore::BlkAllocStatus::SUCCESS) { + LOGE("Failed to allocate blocks for shard {}", shard_meta.shard_id()); + return ALLOC_BLK_ERR; + } + + const auto ret = homestore::data_service() + .async_write(r_cast< char const* >(&shard_sb), sizeof(shard_sb), blk_id) + .thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > { + // TODO: do we need to update repl_dev metrics? + if (err) { + LOGE("Failed to write shard info to blk_id: {}", blk_id.to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR)); + } + LOGD("Shard info written to blk_id:{}", blk_id.to_string()); + return 0; + }) + .get(); + if (ret) { + LOGE("Failed to write shard info of shard_id {} to blk_id:{}", shard_meta.shard_id(), blk_id.to_string()); + return WRITE_DATA_ERR; + } + + // Now let's create local shard + home_obj_.local_create_shard(shard_sb.info, chunk_id, blk_id.blk_count()); + shard_cursor_ = shard_meta.shard_id(); + cur_batch_num_ = 0; + return 0; +} + +int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs, + const snp_batch_id_t batch_num) { + cur_batch_num_ = batch_num; + for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) { + const auto blob = data_blobs.blob_list()->Get(i); + + // Skip deleted blobs + if (blob->state() == static_cast< uint8_t >(ResyncBlobState::DELETED)) { + LOGD("Skip deleted blob_id:{}", blob->blob_id()); + continue; + } + + // Check duplication to avoid reprocessing + shared< BlobIndexTable > index_table; + { + std::shared_lock lock_guard(home_obj_._pg_lock); + auto iter = home_obj_._pg_map.find(pg_id_); + RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found"); + index_table = dynamic_cast< HS_PG* >(iter->second.get())->index_table_; + } + RELEASE_ASSERT(index_table != nullptr, "Index table instance null"); + if (home_obj_.get_blob_from_index_table(index_table, shard_cursor_, blob->blob_id())) { + LOGD("Skip already persisted blob_id:{}", blob->blob_id()); + continue; + } + + auto blob_data = blob->data()->Data(); + + // Check integrity of normal blobs + if (blob->state() != static_cast< uint8_t >(ResyncBlobState::CORRUPTED)) { + auto header = r_cast< BlobHeader const* >(blob_data); + if (!header->valid()) { + LOGE("Invalid header found for blob_id {}: [header={}]", blob->blob_id(), header->to_string()); + return INVALID_BLOB_HEADER; + } + std::string user_key = header->user_key_size + ? std::string(r_cast< const char* >(blob_data + sizeof(BlobHeader)), header->user_key_size) + : std::string{}; + + uint8_t computed_hash[BlobHeader::blob_max_hash_len]{}; + home_obj_.compute_blob_payload_hash(header->hash_algorithm, blob_data + header->data_offset, + header->blob_size, uintptr_cast(user_key.data()), header->user_key_size, + computed_hash, BlobHeader::blob_max_hash_len); + if (std::memcmp(computed_hash, header->hash, BlobHeader::blob_max_hash_len) != 0) { + LOGE("Hash mismatch for blob_id {}: header [{}] [computed={:np}]", blob->blob_id(), header->to_string(), + spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len)); + return BLOB_DATA_CORRUPTED; + } + } + + // Alloc & persist blob data + auto chunk_id = home_obj_.get_shard_chunk(shard_cursor_); + RELEASE_ASSERT(chunk_id.has_value(), "Failed to load chunk of current shard_cursor:{}", shard_cursor_); + homestore::blk_alloc_hints hints; + hints.chunk_id_hint = *chunk_id; + + auto data_size = blob->data()->size(); + homestore::MultiBlkId blk_id; + auto status = homestore::data_service().alloc_blks( + sisl::round_up(data_size, homestore::data_service().get_blk_size()), hints, blk_id); + if (status != homestore::BlkAllocStatus::SUCCESS) { + LOGE("Failed to allocate blocks for shard {} blob {}", shard_cursor_, blob->blob_id()); + return ALLOC_BLK_ERR; + } + + auto free_allocated_blks = [blk_id]() { + homestore::data_service().async_free_blk(blk_id).thenValue([blk_id](auto&& err) { + LOGD("Freed blk_id:{} due to failure in adding blob info, err {}", blk_id.to_string(), + err ? err.message() : "nil"); + }); + }; + + const auto ret = homestore::data_service() + .async_write(r_cast< char const* >(blob_data), data_size, blk_id) + .thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > { + // TODO: do we need to update repl_dev metrics? + if (err) { + LOGE("Failed to write shard info to blk_id: {}", blk_id.to_string()); + return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR)); + } + LOGD("Shard info written to blk_id:{}", blk_id.to_string()); + return 0; + }) + .get(); + if (ret) { + LOGE("Failed to write shard info of blob_id {} to blk_id:{}", blob->blob_id(), blk_id.to_string()); + free_allocated_blks(); + return WRITE_DATA_ERR; + } + + // Add local blob info to index & PG + bool success = home_obj_.local_add_blob_info(pg_id_, BlobInfo{shard_cursor_, blob->blob_id(), {0, 0, 0}}); + if (!success) { + LOGE("Failed to add blob info for blob_id:{}", blob->blob_id()); + free_allocated_blks(); + return ADD_BLOB_INDEX_ERR; + } + } + + return 0; +} + +shard_id_t HSHomeObject::SnapshotReceiveHandler::get_next_shard() const { + if (shard_list_.empty()) { return shard_list_end_marker; } + + if (shard_cursor_ == 0) { return shard_list_[0]; } + + for (size_t i = 0; i < shard_list_.size(); ++i) { + if (shard_list_[i] == shard_cursor_) { + return (i + 1 < shard_list_.size()) ? shard_list_[i + 1] : shard_list_end_marker; + } + } + + return invalid_shard_id; +} + +} // namespace homeobject diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index df2c3355..c00f33b5 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -207,3 +207,5 @@ TEST_F(HomeObjectFixture, PGBlobIterator) { // EXPECT_EQ(result.object_off, b.blob.object_off); // } } + +TEST_F(HomeObjectFixture, SnapshotReceiveHandler) {}