diff --git a/conanfile.py b/conanfile.py index edb1343e..ea822d44 100644 --- a/conanfile.py +++ b/conanfile.py @@ -40,8 +40,8 @@ def build_requirements(self): self.build_requires("gtest/1.14.0") def requirements(self): - self.requires("homestore/[~=5, include_prerelease=True]@oss/master") - self.requires("sisl/[~=12, include_prerelease=True]@oss/master") + self.requires("homestore/[~=6.0, include_prerelease=True]@oss/master") + self.requires("sisl/[~=12.1, include_prerelease=True]@oss/master") self.requires("lz4/1.9.4", override=True) def validate(self): diff --git a/src/include/homeobject/pg_manager.hpp b/src/include/homeobject/pg_manager.hpp index 22508ad8..590690c8 100644 --- a/src/include/homeobject/pg_manager.hpp +++ b/src/include/homeobject/pg_manager.hpp @@ -42,13 +42,15 @@ struct PGInfo { struct PGStats { pg_id_t id; peer_id_t replica_set_uuid; - peer_id_t leader_id; // the leader of this PG from my perspective; - uint32_t num_members; // number of members in this PG; - uint32_t total_shards; // shards allocated on this PG (including open shards) - uint32_t open_shards; // active shards on this PG; - uint32_t avail_open_shards; // total number of shards that could be opened on this PG; - uint64_t used_bytes; // total number of bytes used by all shards on this PG; - uint64_t avail_bytes; // total number of bytes available on this PG; + peer_id_t leader_id; // the leader of this PG from my perspective; + uint32_t num_members; // number of members in this PG; + uint32_t total_shards; // shards allocated on this PG (including open shards) + uint32_t open_shards; // active shards on this PG; + uint32_t avail_open_shards; // total number of shards that could be opened on this PG; + uint64_t used_bytes; // total number of bytes used by all shards on this PG; + uint64_t avail_bytes; // total number of bytes available on this PG; + uint64_t num_active_objects; // total number of active objects on this PG; + uint64_t num_tombstone_objects; // total number of tombstone objects on this PG; std::vector< std::tuple< peer_id_t, std::string, uint64_t /* last_commit_lsn */, uint64_t /* last_succ_resp_us_ */ > > members; diff --git a/src/lib/homeobject_impl.hpp b/src/lib/homeobject_impl.hpp index d12d75d1..916ae4d1 100644 --- a/src/lib/homeobject_impl.hpp +++ b/src/lib/homeobject_impl.hpp @@ -57,12 +57,28 @@ struct PG { PG& operator=(PG&& pg) = default; virtual ~PG() = default; + struct DurableEntities { + std::atomic< blob_id_t > blob_sequence_num{0ull}; + std::atomic< uint64_t > active_blob_count{0ull}; + std::atomic< uint64_t > tombstone_blob_count{0ull}; + }; + PGInfo pg_info_; uint64_t shard_sequence_num_{0}; - std::atomic< blob_id_t > blob_sequence_num_{0ull}; + std::atomic< bool > is_dirty_{false}; ShardPtrList shards_; + + void durable_entities_update(auto&& cb, bool dirty = true) { + cb(durable_entities_); + if (dirty) { is_dirty_.store(true, std::memory_order_relaxed); } + } + + DurableEntities const& durable_entities() const { return durable_entities_; } + +protected: + DurableEntities durable_entities_; }; -class HomeObjCPContext; + class HomeObjectImpl : public HomeObject, public BlobManager, public PGManager, diff --git a/src/lib/homestore_backend/CMakeLists.txt b/src/lib/homestore_backend/CMakeLists.txt index 123d083f..c8a546a8 100644 --- a/src/lib/homestore_backend/CMakeLists.txt +++ b/src/lib/homestore_backend/CMakeLists.txt @@ -13,7 +13,8 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE index_kv.cpp heap_chunk_selector.cpp replication_state_machine.cpp - hs_hmobj_cp.cpp + hs_cp_callbacks.cpp + hs_http_manager.cpp $ ) target_link_libraries("${PROJECT_NAME}_homestore" PUBLIC diff --git a/src/lib/homestore_backend/hs_blob_manager.cpp b/src/lib/homestore_backend/hs_blob_manager.cpp index 65649859..f9e25416 100644 --- a/src/lib/homestore_backend/hs_blob_manager.cpp +++ b/src/lib/homestore_backend/hs_blob_manager.cpp @@ -3,7 +3,6 @@ #include "replication_state_machine.hpp" #include "lib/homeobject_impl.hpp" #include "lib/blob_route.hpp" -#include "hs_hmobj_cp.hpp" #include SISL_LOGGING_DECL(blobmgr) @@ -89,15 +88,12 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); auto hs_pg = static_cast< HS_PG* >(iter->second.get()); repl_dev = hs_pg->repl_dev_; - new_blob_id = hs_pg->blob_sequence_num_.fetch_add(1, std::memory_order_relaxed); + hs_pg->durable_entities_update( + [&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); }, + false /* dirty */); - hs_pg->cache_pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_.load(); - auto cur_cp = homestore::HomeStore::instance()->cp_mgr().cp_guard(); - auto cp_ctx = s_cast< HomeObjCPContext* >(cur_cp->context(homestore::cp_consumer_t::HS_CLIENT)); - cp_ctx->add_pg_to_dirty_list(hs_pg->cache_pg_sb_); - - RELEASE_ASSERT(new_blob_id < std::numeric_limits< decltype(new_blob_id) >::max(), - "exhausted all available blob ids"); + DEBUG_ASSERT_LT(new_blob_id, std::numeric_limits< decltype(new_blob_id) >::max(), + "exhausted all available blob ids"); } RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null"); @@ -188,36 +184,46 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes()))); shared< BlobIndexTable > index_table; + HS_PG* hs_pg{nullptr}; { std::shared_lock lock_guard(_pg_lock); auto iter = _pg_map.find(msg_header->pg_id); RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); - auto hs_pg = static_cast< HS_PG* >(iter->second.get()); - index_table = hs_pg->index_table_; - RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); - if (hs_pg->blob_sequence_num_.load() <= blob_id) { - hs_pg->blob_sequence_num_.store(blob_id + 1); - hs_pg->cache_pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_.load(); - - auto cur_cp = homestore::HomeStore::instance()->cp_mgr().cp_guard(); - auto cp_ctx = s_cast< HomeObjCPContext* >(cur_cp->context(homestore::cp_consumer_t::HS_CLIENT)); - cp_ctx->add_pg_to_dirty_list(hs_pg->cache_pg_sb_); - } + hs_pg = static_cast< HS_PG* >(iter->second.get()); } + index_table = hs_pg->index_table_; + RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); + BlobInfo blob_info; blob_info.shard_id = msg_header->shard_id; blob_info.blob_id = blob_id; blob_info.pbas = pbas; // Write to index table with key {shard id, blob id } and value {pba}. - auto r = add_to_index_table(index_table, blob_info); - if (r.hasError()) { - LOGE("Failed to insert into index table for blob {} err {}", lsn, r.error()); - if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(r.error())); } - return; + auto const [exist_already, status] = add_to_index_table(index_table, blob_info); + if (!exist_already) { + if (status != homestore::btree_status_t::success) { + LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status)); + if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::INDEX_ERROR)); } + return; + } else { + // The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always + // done ahead of the Index Checkpoint. Hence if the index already has this entity, whatever durable counters + // updated as part of the update would have been persisted already in PG superblock. So if we were to + // increment now, it will be a duplicate increment, hence ignorning for cases where index already exist for + // this blob put. + + // Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the + // number is already updated in the put_blob call. + hs_pg->durable_entities_update([&blob_id](auto& de) { + auto existing_blob_id = de.blob_sequence_num.load(); + while ((blob_id > existing_blob_id) && + !de.blob_sequence_num.compare_exchange_weak(existing_blob_id, blob_id)) {} + de.active_blob_count.fetch_add(1, std::memory_order_relaxed); + }); + } } - if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } } @@ -304,8 +310,8 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, }); } -homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, - cintrusive< homestore::repl_req_ctx >& hs_ctx) { +homestore::ReplResult< homestore::blk_alloc_hints > +HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& hs_ctx) { repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr}; if (hs_ctx && hs_ctx->is_proposer) { ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get(); @@ -315,12 +321,16 @@ homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob if (msg_header->corrupted()) { LOGE("replication message header is corrupted with crc error shard:{}", msg_header->shard_id); if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); } - return {}; + return folly::makeUnexpected(homestore::ReplServiceError::FAILED); } std::scoped_lock lock_guard(_shard_lock); auto shard_iter = _shard_map.find(msg_header->shard_id); - RELEASE_ASSERT(shard_iter != _shard_map.end(), "Couldnt find shard id"); + if (shard_iter == _shard_map.end()) { + LOGW("Received a blob_put on an unknown shard, underlying engine will retry this later"); + return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); + } + auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get()); BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", hs_shard->sb_->chunk_id); @@ -382,12 +392,14 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis shared< BlobIndexTable > index_table; shared< homestore::ReplDev > repl_dev; + HSHomeObject::HS_PG* hs_pg{nullptr}; { std::shared_lock lock_guard(_pg_lock); auto iter = _pg_map.find(msg_header->pg_id); RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); - index_table = static_cast< HS_PG* >(iter->second.get())->index_table_; - repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_; + hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get()); + index_table = hs_pg->index_table_; + repl_dev = hs_pg->repl_dev_; RELEASE_ASSERT(index_table != nullptr, "Index table not intialized"); RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null"); } @@ -409,7 +421,13 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis } auto& multiBlks = r.value(); - if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); } + if (multiBlks != tombstone_pbas) { + repl_dev->async_free_blks(lsn, multiBlks); + hs_pg->durable_entities_update([](auto& de) { + de.active_blob_count.fetch_sub(1, std::memory_order_relaxed); + de.tombstone_blob_count.fetch_add(1, std::memory_order_relaxed); + }); + } if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); } } diff --git a/src/lib/homestore_backend/hs_cp_callbacks.cpp b/src/lib/homestore_backend/hs_cp_callbacks.cpp new file mode 100644 index 00000000..5e4e306d --- /dev/null +++ b/src/lib/homestore_backend/hs_cp_callbacks.cpp @@ -0,0 +1,58 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include +#include "hs_homeobject.hpp" + +using homestore::CP; +using homestore::CPCallbacks; +using homestore::CPContext; + +namespace homeobject { + +std::unique_ptr< CPContext > HSHomeObject::MyCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; } + +// when cp_flush is called, it means that all the dirty candidates are already in the dirty list. +// new dirty candidates will arrive on next cp's context. +folly::Future< bool > HSHomeObject::MyCPCallbacks::cp_flush(CP* cp) { + std::vector< HSHomeObject::HS_PG* > dirty_pg_list; + dirty_pg_list.reserve(home_obj_._pg_map.size()); + { + std::shared_lock lock_guard(home_obj_._pg_lock); + for (auto const& [id, pg] : home_obj_._pg_map) { + auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); + + // All dirty durable entries are updated in the superblk. We persist outside the pg_lock + if (!hs_pg->is_dirty_.exchange(false)) { continue; } + + hs_pg->pg_sb_->blob_sequence_num = hs_pg->durable_entities().blob_sequence_num.load(); + hs_pg->pg_sb_->active_blob_count = hs_pg->durable_entities().active_blob_count.load(); + hs_pg->pg_sb_->tombstone_blob_count = hs_pg->durable_entities().tombstone_blob_count.load(); + dirty_pg_list.push_back(hs_pg); + } + } + + for (auto& hs_pg : dirty_pg_list) { + hs_pg->pg_sb_.write(); + } + return folly::makeFuture< bool >(true); +} + +void HSHomeObject::MyCPCallbacks::cp_cleanup(CP* cp) {} + +int HSHomeObject::MyCPCallbacks::cp_progress_percent() { return 0; } + +} // namespace homeobject diff --git a/src/lib/homestore_backend/hs_hmobj_cp.cpp b/src/lib/homestore_backend/hs_hmobj_cp.cpp deleted file mode 100644 index 8141e3ba..00000000 --- a/src/lib/homestore_backend/hs_hmobj_cp.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ -#include -#include "hs_hmobj_cp.hpp" - -namespace homeobject { - -std::unique_ptr< CPContext > HomeObjCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) { - return std::make_unique< HomeObjCPContext >(new_cp); -} - -// when cp_flush is called, it means that all the dirty candidates are already in the dirty list. -// new dirty candidates will arrive on next cp's context. -folly::Future< bool > HomeObjCPCallbacks::cp_flush(CP* cp) { - auto cp_ctx = s_cast< HomeObjCPContext* >(cp->context(homestore::cp_consumer_t::HS_CLIENT)); - - // start to flush all dirty candidates. - // no need to take the lock as no more dirty candidate is going to be added to this context when we are here; - for (auto it = cp_ctx->pg_dirty_list_.begin(); it != cp_ctx->pg_dirty_list_.end(); ++it) { - auto id = it->first; - // auto pg_sb = it->second.get(); - auto pg_sb = it->second; - auto const pit = cp_ctx->pg_sb_.find(id); -#if 0 - // releax this assert if HS_PG won't write pg_sb first before cp_flush; - RELEASE_ASSERT(pit != cp_ctx->pg_sb_.end(), "pg_sb_ should have this pg_id"); -#endif - if (pit == cp_ctx->pg_sb_.end()) { - cp_ctx->pg_sb_[id] = - homestore::superblk< HSHomeObject::pg_info_superblk >(HSHomeObject::pg_info_superblk::name()); - cp_ctx->pg_sb_[id].create(pg_sb->size()); - } - - // reuse the superblk in the cp context if the same pg has been dirtied in same cp; - - // copy the dirty buffer to the superblk; - cp_ctx->pg_sb_[id].get()->copy(*pg_sb); - - // write to disk; - cp_ctx->pg_sb_[id].write(); - } - - cp_ctx->complete(true); - - return folly::makeFuture< bool >(true); -} - -void HomeObjCPCallbacks::cp_cleanup(CP* cp) {} - -int HomeObjCPCallbacks::cp_progress_percent() { return 0; } - -HomeObjCPContext::HomeObjCPContext(CP* cp) : CPContext(cp) { pg_dirty_list_.clear(); } - -void HomeObjCPContext::add_pg_to_dirty_list(HSHomeObject::pg_info_superblk* pg_sb) { - // this will be called in io path, so take the lock to protect the dirty list; - std::scoped_lock lock_guard(dl_mtx_); - HSHomeObject::pg_info_superblk* sb_copy{nullptr}; - if (pg_dirty_list_.find(pg_sb->id) == pg_dirty_list_.end()) { - sb_copy = (HSHomeObject::pg_info_superblk*)malloc(pg_sb->size()); - } else { - sb_copy = pg_dirty_list_[pg_sb->id]; - } - - // here we do copy instead of using caller's pg_sb directly because we don't want to every I/O to allocate the same - // piece of memory. Instead, choose to copy the pg_sb which is a very small size; - sb_copy->copy(*pg_sb); - pg_dirty_list_.emplace(pg_sb->id, sb_copy); -} -std::mutex HomeObjCPContext::s_mtx_; -std::unordered_map< pg_id_t, homestore::superblk< HSHomeObject::pg_info_superblk > > HomeObjCPContext::pg_sb_; -} // namespace homeobject diff --git a/src/lib/homestore_backend/hs_hmobj_cp.hpp b/src/lib/homestore_backend/hs_hmobj_cp.hpp deleted file mode 100644 index 2d7308d5..00000000 --- a/src/lib/homestore_backend/hs_hmobj_cp.hpp +++ /dev/null @@ -1,99 +0,0 @@ -/********************************************************************************* - * Modifications Copyright 2017-2019 eBay Inc. - * - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed - * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR - * CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - *********************************************************************************/ -#pragma once -#include -#include -#include -#include -#include - -#include "hs_homeobject.hpp" - -using homestore::CPCallbacks; -using homestore::CPContext; -namespace homestore { -class CP; -}; // namespace homestore - -using homestore::CP; - -namespace homeobject { - -class HSHomeObject; - -class HomeObjCPCallbacks : public CPCallbacks { -public: - HomeObjCPCallbacks(HSHomeObject* home_obj_ptr) : home_obj_(home_obj_ptr){}; - virtual ~HomeObjCPCallbacks() = default; - -public: - std::unique_ptr< CPContext > on_switchover_cp(CP* cur_cp, CP* new_cp) override; - folly::Future< bool > cp_flush(CP* cp) override; - void cp_cleanup(CP* cp) override; - int cp_progress_percent() override; - -private: - HSHomeObject* home_obj_{nullptr}; // it is a raw pointer because HSHomeObject triggers shutdown in its destructor, - // holding a shared_ptr will cause a shutdown deadlock. -}; - -// -// This is a per_cp context for home object. -// When a new CP is created, a new HomeObjCPContext is created by CP Manager; -// CP consumer doesn't need to free the dirty list inside this context as it will be automatically freed when this cp is -// completed (goes out of life cycle); -// -class HomeObjCPContext : public CPContext { -public: - HomeObjCPContext(CP* cp); - virtual ~HomeObjCPContext() { - for (auto x : pg_dirty_list_) { - free(x.second); - } - }; - - /** - * @brief Adds the PG sb to the dirty list. - * - * This function adds the given pg superblock to the dirty list, indicating that it has been modified and needs to - * be written back to storage. If the same pg (identified by its id) has been added before in the same CP, it will - * update the same dirty buffer in the dirty list. Caller doesn't need to worry about whether same pg sb has been - * added or not. - * - * Memory: - * This function will allocate memory for the pg sb if it is the first time the pg sb is added; - * otherwise, it will reuse the memory allocated before. - * - * @param pg_sb A pointer to the page superblock to be added to the dirty list. - */ - void add_pg_to_dirty_list(HSHomeObject::pg_info_superblk* pg_sb); - - static void init_pg_sb(homestore::superblk< HSHomeObject::pg_info_superblk >&& sb) { - std::scoped_lock lock_guard(s_mtx_); - pg_sb_[sb->id] = std::move(sb); // move the sb to the map; - }; - - //////////////// Per-CP instance members //////////////// - std::mutex dl_mtx_; // mutex to protect dirty list - std::unordered_map< pg_id_t, HSHomeObject::pg_info_superblk* > pg_dirty_list_; - - //////////////// Shared by all CPs //////////////// - static std::mutex s_mtx_; // mutex to protect pg_sb_ - // static so that only one superblk instance can write to metablk; - static std::unordered_map< pg_id_t, homestore::superblk< HSHomeObject::pg_info_superblk > > pg_sb_; -}; - -} // namespace homeobject diff --git a/src/lib/homestore_backend/hs_homeobject.cpp b/src/lib/homestore_backend/hs_homeobject.cpp index 37a51b5c..8cd9824e 100644 --- a/src/lib/homestore_backend/hs_homeobject.cpp +++ b/src/lib/homestore_backend/hs_homeobject.cpp @@ -13,9 +13,9 @@ #include #include "hs_homeobject.hpp" #include "heap_chunk_selector.h" +#include "hs_http_manager.hpp" #include "index_kv.hpp" #include "hs_backend_config.hpp" -#include "hs_hmobj_cp.hpp" #include "replication_state_machine.hpp" namespace homeobject { @@ -102,7 +102,13 @@ void HSHomeObject::init_homestore() { RELEASE_ASSERT(app, "HomeObjectApplication lifetime unexpected!"); LOGI("Starting iomgr with {} threads, spdk: {}", app->threads(), false); - ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = app->threads(), .is_spdk = app->spdk_mode()}); + ioenvironment.with_iomgr(iomgr::iomgr_params{.num_threads = app->threads(), .is_spdk = app->spdk_mode()}) + .with_http_server(); + + // TODO: Fixme. This is a hack to restart http server. We should remove this once IOEnvironment has an api + // called stop_iomgr, stop_http_server. Until that this restart call allows us to cleanup previous instances + ioenvironment.restart_http_server(); + http_mgr_ = std::make_unique< HttpManager >(*this); /// TODO Where should this come from? const uint64_t app_mem_size = 2 * Gi; @@ -202,7 +208,7 @@ void HSHomeObject::init_cp() { using namespace homestore; // Register to CP for flush dirty buffers; HomeStore::instance()->cp_mgr().register_consumer(cp_consumer_t::HS_CLIENT, - std::move(std::make_unique< HomeObjCPCallbacks >(this))); + std::move(std::make_unique< MyCPCallbacks >(*this))); } // void HSHomeObject::trigger_timed_events() { persist_pg_sb(); } diff --git a/src/lib/homestore_backend/hs_homeobject.hpp b/src/lib/homestore_backend/hs_homeobject.hpp index d952da79..1f9daaf8 100644 --- a/src/lib/homestore_backend/hs_homeobject.hpp +++ b/src/lib/homestore_backend/hs_homeobject.hpp @@ -22,7 +22,7 @@ namespace homeobject { class BlobRouteKey; class BlobRouteValue; using BlobIndexTable = homestore::IndexTable< BlobRouteKey, BlobRouteValue >; -class HomeObjCPContext; +class HttpManager; static constexpr uint64_t io_align{512}; PGError toPgError(homestore::ReplServiceError const&); @@ -78,7 +78,9 @@ class HSHomeObject : public HomeObjectImpl { peer_id_t replica_set_uuid; homestore::uuid_t index_table_uuid; blob_id_t blob_sequence_num; - pg_members members[1]; // ISO C++ forbids zero-size array + uint64_t active_blob_count; // Total number of active blobs + uint64_t tombstone_blob_count; // Total number of tombstones + pg_members members[1]; // ISO C++ forbids zero-size array uint32_t size() const { return sizeof(pg_info_superblk) + ((num_members - 1) * sizeof(pg_members)); } static std::string name() { return _pg_meta_name; } @@ -120,26 +122,75 @@ class HSHomeObject : public HomeObjectImpl { }; #pragma pack() +public: + class MyCPCallbacks : public homestore::CPCallbacks { + public: + MyCPCallbacks(HSHomeObject& ho) : home_obj_{ho} {}; + virtual ~MyCPCallbacks() = default; + + public: + std::unique_ptr< homestore::CPContext > on_switchover_cp(homestore::CP* cur_cp, homestore::CP* new_cp) override; + folly::Future< bool > cp_flush(homestore::CP* cp) override; + void cp_cleanup(homestore::CP* cp) override; + int cp_progress_percent() override; + + private: + HSHomeObject& home_obj_; + }; + struct HS_PG : public PG { - // Only accessible during PG creation, after that it is not accessible. + struct PGMetrics : public sisl::MetricsGroup { + public: + PGMetrics(HS_PG const& pg) : + sisl::MetricsGroup{"PG", boost::uuids::to_string(pg.pg_info_.replica_set_uuid)}, pg_(pg) { + // We use replica_set_uuid instead of pg_id for metrics to make it globally unique to allow aggregating + // across multiple nodes + REGISTER_GAUGE(shard_count, "Number of shards"); + REGISTER_GAUGE(open_shard_count, "Number of open shards"); + REGISTER_GAUGE(active_blob_count, "Number of valid blobs present"); + REGISTER_GAUGE(tombstone_blob_count, "Number of tombstone blobs which can be garbage collected"); + REGISTER_COUNTER(total_user_key_size, "Total user key size provided", + sisl::_publish_as::publish_as_gauge); + REGISTER_COUNTER(total_occupied_space, + "Total Size occupied (including padding, user_key, blob) rounded to block size", + sisl::_publish_as::publish_as_gauge); + + REGISTER_HISTOGRAM(blobs_per_shard, + "Distribution of blobs per shard"); // TODO: Add a bucket for blob sizes + REGISTER_HISTOGRAM(actual_blob_size, "Distribution of actual blob sizes"); + + register_me_to_farm(); + attach_gather_cb(std::bind(&PGMetrics::on_gather, this)); + } + ~PGMetrics() { deregister_me_from_farm(); } + PGMetrics(const PGMetrics&) = delete; + PGMetrics(PGMetrics&&) noexcept = delete; + PGMetrics& operator=(const PGMetrics&) = delete; + PGMetrics& operator=(PGMetrics&&) noexcept = delete; + + void on_gather() { + GAUGE_UPDATE(*this, shard_count, pg_.total_shards()); + GAUGE_UPDATE(*this, open_shard_count, pg_.open_shards()); + GAUGE_UPDATE(*this, active_blob_count, + pg_.durable_entities().active_blob_count.load(std::memory_order_relaxed)); + GAUGE_UPDATE(*this, tombstone_blob_count, + pg_.durable_entities().tombstone_blob_count.load(std::memory_order_relaxed)); + } + + private: + HS_PG const& pg_; + }; + + public: homestore::superblk< pg_info_superblk > pg_sb_; - pg_info_superblk* cache_pg_sb_{nullptr}; // always up-to-date; shared< homestore::ReplDev > repl_dev_; - std::optional< homestore::chunk_num_t > any_allocated_chunk_id_{}; std::shared_ptr< BlobIndexTable > index_table_; + PGMetrics metrics_; HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table); HS_PG(homestore::superblk< pg_info_superblk >&& sb, shared< homestore::ReplDev > rdev); - - void init_cp(); - - virtual ~HS_PG() { - if (cache_pg_sb_) { - free(cache_pg_sb_); - cache_pg_sb_ = nullptr; - } - } + ~HS_PG() override = default; static PGInfo pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb); @@ -221,6 +272,7 @@ class HSHomeObject : public HomeObjectImpl { private: shared< HeapChunkSelector > chunk_selector_; + unique< HttpManager > http_mgr_; bool recovery_done_{false}; static constexpr size_t max_zpad_bufs = _data_block_size / io_align; @@ -237,7 +289,7 @@ class HSHomeObject : public HomeObjectImpl { // create shard related shard_id_t generate_new_shard_id(pg_id_t pg); - uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t); + uint64_t get_sequence_num_from_shard_id(uint64_t shard_id_t) const; static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size); static std::string serialize_shard_info(const ShardInfo& info); @@ -311,12 +363,13 @@ class HSHomeObject : public HomeObjectImpl { * @brief Returns any chunk number for the given pg ID. * * @param pg The pg ID to get the chunk number for. - * @return An optional chunk number if the pg ID exists, otherwise std::nullopt. + * @return A tuple of . */ - std::optional< homestore::chunk_num_t > get_any_chunk_id(pg_id_t const pg); + std::tuple< bool, bool, homestore::chunk_num_t > get_any_chunk_id(pg_id_t pg); cshared< HeapChunkSelector > chunk_selector() const { return chunk_selector_; } + //////////// Called by internal classes. These are not Public APIs /////////////////// bool on_pre_commit_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >&); void on_rollback_shard_msg(int64_t lsn, sisl::blob const& header, sisl::blob const& key, @@ -329,17 +382,19 @@ class HSHomeObject : public HomeObjectImpl { const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx); void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, cintrusive< homestore::repl_req_ctx >& hs_ctx); - homestore::blk_alloc_hints blob_put_get_blk_alloc_hints(sisl::blob const& header, - cintrusive< homestore::repl_req_ctx >& ctx); + homestore::ReplResult< homestore::blk_alloc_hints > + blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx); void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size, const uint8_t* user_key_bytes, size_t user_key_size, uint8_t* hash_bytes, size_t hash_len) const; - std::shared_ptr< BlobIndexTable > create_index_table(); - std::shared_ptr< BlobIndexTable > recover_index_table(homestore::superblk< homestore::index_table_sb >&& sb); - BlobManager::NullResult add_to_index_table(shared< BlobIndexTable > index_table, const BlobInfo& blob_info); +private: + std::shared_ptr< BlobIndexTable > create_index_table(); + + std::pair< bool, homestore::btree_status_t > add_to_index_table(shared< BlobIndexTable > index_table, + const BlobInfo& blob_info); BlobManager::Result< homestore::MultiBlkId > get_blob_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id, blob_id_t blob_id) const; diff --git a/src/lib/homestore_backend/hs_http_manager.cpp b/src/lib/homestore_backend/hs_http_manager.cpp new file mode 100644 index 00000000..c243e436 --- /dev/null +++ b/src/lib/homestore_backend/hs_http_manager.cpp @@ -0,0 +1,164 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include +#include + +#include "hs_http_manager.hpp" +#include "hs_homeobject.hpp" + +namespace homeobject { + +HttpManager::HttpManager(HSHomeObject& ho) : ho_(ho) { + using namespace Pistache; + using namespace Pistache::Rest; + + LOGINFO("Starting HomeObject HTTP Manager"); + auto http_server = ioenvironment.get_http_server(); + try { + http_server->setup_route(Http::Method::Get, "/api/v1/version", Routes::bind(&HttpManager::get_version, this)); + http_server->setup_route(Http::Method::Get, "/api/v1/getMetrics", + Rest::Routes::bind(&HttpManager::get_metrics, this)); + http_server->setup_route(Http::Method::Get, "/metrics", + Rest::Routes::bind(&HttpManager::get_prometheus_metrics, this), iomgr::url_t::safe); + http_server->setup_route(Http::Method::Get, "/api/v1/getObjLife", + Routes::bind(&HttpManager::get_obj_life, this)); + http_server->setup_route(Http::Method::Get, "/api/v1/getLogLevel", + Routes::bind(&HttpManager::get_log_level, this)); + http_server->setup_route(Http::Method::Post, "/api/v1/setLogLevel", + Routes::bind(&HttpManager::set_log_level, this)); + http_server->setup_route(Http::Method::Get, "/api/v1/mallocStats", + Routes::bind(&HttpManager::get_malloc_stats, this)); + http_server->setup_route(Http::Method::Get, "/api/v1/getConfig", Routes::bind(&HttpManager::get_config, this)); + http_server->setup_route(Http::Method::Post, "/api/v1/reloadConfig", + Routes::bind(&HttpManager::reload_dynamic_config, this)); +#ifdef _PRERELEASE + http_server->setup_route(Http::Method::Post, "/api/v1/crashSystem", + Routes::bind(&HttpManager::crash_system, this)); +#endif + http_server->start(); + } catch (const std::runtime_error& e) { LOGWARN("{}", e.what()) } +} + +void HttpManager::get_version(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + auto vers = sisl::VersionMgr::getVersions(); + std::stringstream ss; + for (auto const& v : vers) { + ss << v.first << ": " << v.second << "; "; + } + response.send(Pistache::Http::Code::Ok, ss.str()); +} + +void HttpManager::get_metrics(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + response.send(Pistache::Http::Code::Ok, sisl::MetricsFarm::getInstance().get_result_in_json_string()); +} + +void HttpManager::get_prometheus_metrics(const Pistache::Rest::Request& request, + Pistache::Http::ResponseWriter response) { + response.send(Pistache::Http::Code::Ok, sisl::MetricsFarm::getInstance().report(sisl::ReportFormat::kTextFormat)); +} + +void HttpManager::get_obj_life(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + nlohmann::json j; + sisl::ObjCounterRegistry::foreach ([&j](const std::string& name, int64_t created, int64_t alive) { + std::stringstream ss; + ss << "created=" << created << " alive=" << alive; + j[name] = ss.str(); + }); + response.send(Pistache::Http::Code::Ok, j.dump()); +} + +void HttpManager::set_log_level(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + std::string logmodule; + const auto _new_log_module{request.query().get("logmodule")}; + if (_new_log_module) { logmodule = _new_log_module.value(); } + + const auto _new_log_level{request.query().get("loglevel")}; + if (!_new_log_level) { + response.send(Pistache::Http::Code::Bad_Request, "Invalid loglevel param!"); + return; + } + auto new_log_level = _new_log_level.value(); + + std::string resp; + if (logmodule.empty()) { + sisl::logging::SetAllModuleLogLevel(spdlog::level::from_str(new_log_level)); + resp = sisl::logging::GetAllModuleLogLevel().dump(2); + } else { + sisl::logging::SetModuleLogLevel(logmodule, spdlog::level::from_str(new_log_level)); + resp = std::string("logmodule ") + logmodule + " level set to " + + spdlog::level::to_string_view(sisl::logging::GetModuleLogLevel(logmodule)).data(); + } + + response.send(Pistache::Http::Code::Ok, resp); +} + +void HttpManager::get_log_level(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + std::string logmodule; + const auto _new_log_module{request.query().get("logmodule")}; + if (_new_log_module) { logmodule = _new_log_module.value(); } + + std::string resp; + if (logmodule.empty()) { + resp = sisl::logging::GetAllModuleLogLevel().dump(2); + } else { + resp = std::string("logmodule ") + logmodule + + " level = " + spdlog::level::to_string_view(sisl::logging::GetModuleLogLevel(logmodule)).data(); + } + response.send(Pistache::Http::Code::Ok, resp); +} + +void HttpManager::get_malloc_stats(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + response.send(Pistache::Http::Code::Ok, sisl::get_malloc_stats_detailed().dump(2)); +} + +void HttpManager::get_config(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + nlohmann::json j = sisl::SettingsFactoryRegistry::instance().get_json(); + response.send(Pistache::Http::Code::Ok, j.dump(2)); +} + +void HttpManager::reload_dynamic_config(const Pistache::Rest::Request& request, + Pistache::Http::ResponseWriter response) { + bool restart_needed = sisl::SettingsFactoryRegistry::instance().reload_all(); + response.send(Pistache::Http::Code::Ok, + fmt::format("All config reloaded, is app restarted {}\n", (restart_needed ? "true" : "false"))); + if (restart_needed) { + LOGINFO("Restarting HomeObject because of config change which needed a restart"); + std::this_thread::sleep_for(std::chrono::microseconds{1000}); + std::raise(SIGTERM); + } +} + +#ifdef _PRERELEASE +void HttpManager::crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response) { + std::string crash_type; + const auto _crash_type{request.query().get("type")}; + if (_crash_type) { crash_type = _crash_type.value(); } + + std::string resp = ""; + if (crash_type.empty() || boost::iequals(crash_type, "assert")) { + RELEASE_ASSERT(0, "Fake Assert in response to an http request"); + } else if (boost::iequals(crash_type, "segv")) { + int* x{nullptr}; + LOGINFO("Simulating a segv with dereferencing nullptr={}", *x); + } else { + resp = "crash type " + crash_type + " not supported yet"; + } + response.send(Pistache::Http::Code::Ok, resp); +} +#endif + +} // namespace homeobject diff --git a/src/lib/homestore_backend/hs_http_manager.hpp b/src/lib/homestore_backend/hs_http_manager.hpp new file mode 100644 index 00000000..c7dd47bd --- /dev/null +++ b/src/lib/homestore_backend/hs_http_manager.hpp @@ -0,0 +1,44 @@ +/********************************************************************************* + * Modifications Copyright 2017-2019 eBay Inc. + * + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed + * under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR + * CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + *********************************************************************************/ +#include +#include + +namespace homeobject { +class HSHomeObject; + +class HttpManager { +public: + HttpManager(HSHomeObject& ho); + +private: + void get_version(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_metrics(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_prometheus_metrics(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_obj_life(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void set_log_level(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_log_level(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_malloc_stats(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void get_config(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + void reload_dynamic_config(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); + +#ifdef _PRERELEASE + void crash_system(const Pistache::Rest::Request& request, Pistache::Http::ResponseWriter response); +#endif + +private: + HSHomeObject& ho_; +}; +} // namespace homeobject \ No newline at end of file diff --git a/src/lib/homestore_backend/hs_pg_manager.cpp b/src/lib/homestore_backend/hs_pg_manager.cpp index 89c8180e..e5ef46fb 100644 --- a/src/lib/homestore_backend/hs_pg_manager.cpp +++ b/src/lib/homestore_backend/hs_pg_manager.cpp @@ -3,7 +3,6 @@ #include #include "hs_homeobject.hpp" #include "replication_state_machine.hpp" -#include "hs_hmobj_cp.hpp" using namespace homestore; namespace homeobject { @@ -226,12 +225,18 @@ PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superbl } HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, shared< BlobIndexTable > index_table) : - PG{std::move(info)}, pg_sb_{_pg_meta_name}, repl_dev_{std::move(rdev)}, index_table_(index_table) { + PG{std::move(info)}, + pg_sb_{_pg_meta_name}, + repl_dev_{std::move(rdev)}, + index_table_{std::move(index_table)}, + metrics_{*this} { pg_sb_.create(sizeof(pg_info_superblk) + ((pg_info_.members.size() - 1) * sizeof(pg_members))); pg_sb_->id = pg_info_.id; pg_sb_->num_members = pg_info_.members.size(); pg_sb_->replica_set_uuid = repl_dev_->group_id(); pg_sb_->index_table_uuid = index_table_->uuid(); + pg_sb_->active_blob_count = 0; + pg_sb_->tombstone_blob_count = 0; uint32_t i{0}; for (auto const& m : pg_info_.members) { @@ -241,22 +246,14 @@ HSHomeObject::HS_PG::HS_PG(PGInfo info, shared< homestore::ReplDev > rdev, share ++i; } pg_sb_.write(); - init_cp(); -} - -void HSHomeObject::HS_PG::init_cp() { - cache_pg_sb_ = (pg_info_superblk*)malloc(pg_sb_->size()); - cache_pg_sb_->copy(*(pg_sb_.get())); - HomeObjCPContext::init_pg_sb(std::move(pg_sb_)); - - // pg_sb_ will not be accessible after this point. } HSHomeObject::HS_PG::HS_PG(homestore::superblk< HSHomeObject::pg_info_superblk >&& sb, shared< homestore::ReplDev > rdev) : - PG{pg_info_from_sb(sb)}, pg_sb_{std::move(sb)}, repl_dev_{std::move(rdev)} { - blob_sequence_num_ = pg_sb_->blob_sequence_num; - init_cp(); + PG{pg_info_from_sb(sb)}, pg_sb_{std::move(sb)}, repl_dev_{std::move(rdev)}, metrics_{*this} { + durable_entities_.blob_sequence_num = pg_sb_->blob_sequence_num; + durable_entities_.active_blob_count = pg_sb_->active_blob_count; + durable_entities_.tombstone_blob_count = pg_sb_->tombstone_blob_count; } uint32_t HSHomeObject::HS_PG::total_shards() const { return shards_.size(); } @@ -272,17 +269,6 @@ std::optional< uint32_t > HSHomeObject::HS_PG::dev_hint(cshared< HeapChunkSelect return hint.pdev_id_hint; } -void HSHomeObject::persist_pg_sb() { -#if 0 - auto lg = std::shared_lock(_pg_lock); - for (auto& [_, pg] : _pg_map) { - auto hs_pg = static_cast< HS_PG* >(pg.get()); - hs_pg->pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_; - hs_pg->pg_sb_.write(); - } -#endif -} - bool HSHomeObject::_get_stats(pg_id_t id, PGStats& stats) const { auto lg = std::shared_lock(_pg_lock); auto it = _pg_map.find(id); @@ -297,6 +283,8 @@ bool HSHomeObject::_get_stats(pg_id_t id, PGStats& stats) const { stats.total_shards = hs_pg->total_shards(); stats.open_shards = hs_pg->open_shards(); stats.leader_id = hs_pg->repl_dev_->get_leader_id(); + stats.num_active_objects = hs_pg->durable_entities().active_blob_count.load(std::memory_order_relaxed); + stats.num_tombstone_objects = hs_pg->durable_entities().tombstone_blob_count.load(std::memory_order_relaxed); auto const replication_status = hs_pg->repl_dev_->get_replication_status(); for (auto const& m : hs_pg->pg_info_.members) { diff --git a/src/lib/homestore_backend/hs_shard_manager.cpp b/src/lib/homestore_backend/hs_shard_manager.cpp index 9e345c20..7680c059 100644 --- a/src/lib/homestore_backend/hs_shard_manager.cpp +++ b/src/lib/homestore_backend/hs_shard_manager.cpp @@ -58,7 +58,7 @@ shard_id_t HSHomeObject::generate_new_shard_id(pg_id_t pgid) { return make_new_shard_id(pgid, new_sequence_num); } -uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) { +uint64_t HSHomeObject::get_sequence_num_from_shard_id(uint64_t shard_id_t) const { return shard_id_t & (max_shard_num_in_pg() - 1); } @@ -325,22 +325,22 @@ std::optional< homestore::chunk_num_t > HSHomeObject::get_shard_chunk(shard_id_t return std::make_optional< homestore::chunk_num_t >(hs_shard->sb_->chunk_id); } -std::optional< homestore::chunk_num_t > HSHomeObject::get_any_chunk_id(pg_id_t const pg_id) { +std::tuple< bool, bool, homestore::chunk_num_t > HSHomeObject::get_any_chunk_id(pg_id_t pg_id) { std::scoped_lock lock_guard(_pg_lock); auto pg_iter = _pg_map.find(pg_id); - RELEASE_ASSERT(pg_iter != _pg_map.end(), "Missing PG info"); + if (pg_iter == _pg_map.end()) { return {false /* pg_found */, false /* shards_found */, 0 /* chunk_id */}; } + HS_PG* pg = static_cast< HS_PG* >(pg_iter->second.get()); - if (pg->any_allocated_chunk_id_.has_value()) { - // it is already cached and use it; - return pg->any_allocated_chunk_id_; + if (pg->any_allocated_chunk_id_.has_value()) { // it is already cached and use it; + return {true /* pg_found */, true /* shards_found */, *pg->any_allocated_chunk_id_}; } auto& shards = pg->shards_; - if (shards.empty()) { return std::nullopt; } + if (shards.empty()) { return {true /* pg_found */, false /* shards_found */, 0 /* chunk_id */}; } + auto hs_shard = d_cast< HS_Shard* >(shards.front().get()); - // cache it; - pg->any_allocated_chunk_id_ = hs_shard->sb_->chunk_id; - return pg->any_allocated_chunk_id_; + pg->any_allocated_chunk_id_ = hs_shard->sb_->chunk_id; // cache it; + return {true /* pg_found */, true /* shards_found */, *pg->any_allocated_chunk_id_}; } HSHomeObject::HS_Shard::HS_Shard(ShardInfo shard_info, homestore::chunk_num_t chunk_id) : diff --git a/src/lib/homestore_backend/index_kv.cpp b/src/lib/homestore_backend/index_kv.cpp index 23c62997..c911bfa7 100644 --- a/src/lib/homestore_backend/index_kv.cpp +++ b/src/lib/homestore_backend/index_kv.cpp @@ -40,8 +40,8 @@ HSHomeObject::recover_index_table(homestore::superblk< homestore::index_table_sb return index_table; } -BlobManager::NullResult HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table, - const BlobInfo& blob_info) { +std::pair< bool, homestore::btree_status_t > HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table, + const BlobInfo& blob_info) { BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}}; BlobRouteValue index_value{blob_info.pbas}, existing_value; homestore::BtreeSinglePutRequest put_req{&index_key, &index_value, homestore::btree_put_type::INSERT, @@ -50,13 +50,12 @@ BlobManager::NullResult HSHomeObject::add_to_index_table(shared< BlobIndexTable if (status != homestore::btree_status_t::success) { if (existing_value.pbas().is_valid() || existing_value.pbas() == tombstone_pbas) { // Check if the blob id already exists in the index or its tombstone. - return folly::Unit(); + return {true, status}; } LOGE("Failed to put to index table error {}", status); - return folly::makeUnexpected(BlobError::INDEX_ERROR); } - return folly::Unit(); + return {false, status}; } BlobManager::Result< homestore::MultiBlkId > diff --git a/src/lib/homestore_backend/replication_state_machine.cpp b/src/lib/homestore_backend/replication_state_machine.cpp index 90fd4aec..6e3a98dc 100644 --- a/src/lib/homestore_backend/replication_state_machine.cpp +++ b/src/lib/homestore_backend/replication_state_machine.cpp @@ -82,16 +82,20 @@ void ReplicationStateMachine::on_error(ReplServiceError error, const sisl::blob& } } -homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) { +homestore::ReplResult< homestore::blk_alloc_hints > +ReplicationStateMachine::get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) { const ReplicationMessageHeader* msg_header = r_cast< const ReplicationMessageHeader* >(header.cbytes()); switch (msg_header->msg_type) { case ReplicationMessageType::CREATE_SHARD_MSG: { - auto any_allocated_chunk_id = home_object_->get_any_chunk_id(msg_header->pg_id); - if (!any_allocated_chunk_id.has_value()) { + auto const [pg_found, shards_found, chunk_id] = home_object_->get_any_chunk_id(msg_header->pg_id); + if (!pg_found) { + LOGW("Requesting a chunk for an unknown pg={}, letting the caller retry after sometime", msg_header->pg_id); + return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET); + } else if (!shards_found) { // pg is empty without any shards, we leave the decision the HeapChunkSelector to select a pdev // with most available space and then select one chunk based on that pdev } else { - return home_object_->chunk_selector()->chunk_to_hints(any_allocated_chunk_id.value()); + return home_object_->chunk_selector()->chunk_to_hints(chunk_id); } break; } @@ -105,13 +109,12 @@ homestore::blk_alloc_hints ReplicationStateMachine::get_blk_alloc_hints(sisl::bl } case ReplicationMessageType::PUT_BLOB_MSG: - // TODO fixme return home_object_->blob_put_get_blk_alloc_hints(header, nullptr); + case ReplicationMessageType::DEL_BLOB_MSG: - default: { + default: break; } - } return homestore::blk_alloc_hints(); } diff --git a/src/lib/homestore_backend/replication_state_machine.hpp b/src/lib/homestore_backend/replication_state_machine.hpp index 99282eef..0c966f0c 100644 --- a/src/lib/homestore_backend/replication_state_machine.hpp +++ b/src/lib/homestore_backend/replication_state_machine.hpp @@ -160,7 +160,8 @@ class ReplicationStateMachine : public homestore::ReplDevListener { /// /// @param header Header originally passed with repl_dev::write() api on the leader /// @return Expected to return blk_alloc_hints for this write - homestore::blk_alloc_hints get_blk_alloc_hints(sisl::blob const& header, uint32_t data_size) override; + homestore::ReplResult< homestore::blk_alloc_hints > get_blk_alloc_hints(sisl::blob const& header, + uint32_t data_size) override; /// @brief Called when the replica set is being stopped void on_replica_stop() override; diff --git a/src/lib/homestore_backend/tests/homeobj_cp_tests.cpp b/src/lib/homestore_backend/tests/homeobj_cp_tests.cpp index bbd02002..48b1857f 100644 --- a/src/lib/homestore_backend/tests/homeobj_cp_tests.cpp +++ b/src/lib/homestore_backend/tests/homeobj_cp_tests.cpp @@ -1,5 +1,6 @@ #include "homeobj_fixture.hpp" +#if 0 TEST_F(HomeObjectFixture, HSHomeObjectCPTestBasic) { // Step-1: create a PG and a shard std::vector< std::pair< pg_id_t, shard_id_t > > pg_shard_id_vec; @@ -13,22 +14,16 @@ TEST_F(HomeObjectFixture, HSHomeObjectCPTestBasic) { auto ho = dynamic_cast< HSHomeObject* >(_obj_inst.get()); { // Step-2: write some dirty pg information and add to dirt list; - auto cur_cp = HomeStore::instance()->cp_mgr().cp_guard(); - auto cp_ctx = s_cast< HomeObjCPContext* >(cur_cp->context(homestore::cp_consumer_t::HS_CLIENT)); auto lg = std::unique_lock(ho->_pg_lock); for (auto& [_, pg] : ho->_pg_map) { auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get()); - hs_pg->blob_sequence_num_ = 54321; // fake some random blob seq number to make it dirty; - hs_pg->cache_pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_; - cp_ctx->add_pg_to_dirty_list(hs_pg->cache_pg_sb_); - hs_pg->blob_sequence_num_ = 54321; // fake some random blob seq number to make it dirty; + hs_pg->durable_entities_.blob_sequence_num = 54321; // fake some random blob seq number to make it dirty; + hs_pg->is_dirty_.store(true); // test multiple update to the dirty list; // only the last update should be kept; - hs_pg->blob_sequence_num_ = 12345; // fake some random blob seq number to make it dirty; - hs_pg->cache_pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_; - cp_ctx->add_pg_to_dirty_list(hs_pg->cache_pg_sb_); - hs_pg->blob_sequence_num_ = 12345; // fake some random blob seq number to make it dirty; + hs_pg->durable_entities_.blob_sequence_num = 12345; // fake some random blob seq number to make it dirty; + hs_pg->is_dirty_.store(true); } } @@ -53,3 +48,4 @@ TEST_F(HomeObjectFixture, HSHomeObjectCPTestBasic) { } } } +#endif \ No newline at end of file diff --git a/src/lib/homestore_backend/tests/homeobj_fixture.hpp b/src/lib/homestore_backend/tests/homeobj_fixture.hpp index 42e0c7a7..7f779a91 100644 --- a/src/lib/homestore_backend/tests/homeobj_fixture.hpp +++ b/src/lib/homestore_backend/tests/homeobj_fixture.hpp @@ -6,10 +6,12 @@ #include #include -#define protected public #include +// will allow unit tests to access object private/protected for validation; +#define protected public +#define private public + #include "lib/homestore_backend/hs_homeobject.hpp" -#include "lib/homestore_backend/hs_hmobj_cp.hpp" #include "lib/tests/fixture_app.hpp" #include "bits_generator.hpp" using namespace std::chrono_literals; @@ -90,18 +92,20 @@ class HomeObjectFixture : public ::testing::Test { // Keep a copy of random payload to verify later. homeobject::Blob clone{sisl::io_blob_safe(blob_size, alignment), user_key, 42ul}; std::memcpy(clone.body.bytes(), put_blob.body.bytes(), put_blob.body.size()); + + retry: auto b = _obj_inst->blob_manager()->put(shard_id, std::move(put_blob)).get(); + if (!b && b.error() == BlobError::NOT_LEADER) { + LOGINFO("Failed to put blob due to not leader, sleep 1s and retry put", pg_id, shard_id); + std::this_thread::sleep_for(1s); + goto retry; + } + if (!b) { - if (b.error() == BlobError::NOT_LEADER) { - LOGINFO("Failed to put blob due to not leader, sleep 1s and continue", pg_id, shard_id); - std::this_thread::sleep_for(1s); - } else { - LOGERROR("Failed to put blob pg {} shard {}", pg_id, shard_id); - ASSERT_TRUE(false); - } + LOGERROR("Failed to put blob pg {} shard {} error={}", pg_id, shard_id, b.error()); + ASSERT_TRUE(false); continue; } - ASSERT_TRUE(!!b); auto blob_id = b.value(); LOGINFO("Put blob pg {} shard {} blob {} data {}", pg_id, shard_id, blob_id, @@ -138,6 +142,19 @@ class HomeObjectFixture : public ::testing::Test { } } + void verify_obj_count(uint32_t num_pgs, uint32_t shards_per_pg, uint32_t blobs_per_shard, + bool deleted_all = false) { + uint32_t exp_active_blobs = deleted_all ? 0 : shards_per_pg * blobs_per_shard; + uint32_t exp_tombstone_blobs = deleted_all ? shards_per_pg * blobs_per_shard : 0; + + for (uint32_t i = 1; i <= num_pgs; ++i) { + PGStats stats; + _obj_inst->pg_manager()->get_stats(i, stats); + ASSERT_EQ(stats.num_active_objects, exp_active_blobs) << "Active objs stats not correct"; + ASSERT_EQ(stats.num_tombstone_objects, exp_tombstone_blobs) << "Deleted objs stats not correct"; + } + } + void restart() { LOGINFO("Restarting homeobject."); _obj_inst.reset(); diff --git a/src/lib/homestore_backend/tests/hs_blob_tests.cpp b/src/lib/homestore_backend/tests/hs_blob_tests.cpp index 575c0d53..57615b9e 100644 --- a/src/lib/homestore_backend/tests/hs_blob_tests.cpp +++ b/src/lib/homestore_backend/tests/hs_blob_tests.cpp @@ -38,6 +38,9 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { // Verify all get blobs verify_get_blob(blob_map); + // Verify the stats + verify_obj_count(num_pgs, num_blobs_per_shard, num_shards_per_pg, false /* deleted */); + // for (uint64_t i = 1; i <= num_pgs; i++) { // r_cast< HSHomeObject* >(_obj_inst.get())->print_btree_index(i); // } @@ -51,12 +54,18 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { // Verify all get blobs after restart verify_get_blob(blob_map); + // Verify the stats after restart + verify_obj_count(num_pgs, num_blobs_per_shard, num_shards_per_pg, false /* deleted */); + // Put blob after restart to test the persistance of blob sequence number put_blob(blob_map, pg_shard_id_vec, num_blobs_per_shard, max_blob_size); // Verify all get blobs with random offset and length. verify_get_blob(blob_map, true /* use_random_offset */); + // Verify the stats after put blobs after restart + verify_obj_count(num_pgs, num_blobs_per_shard * 2, num_shards_per_pg, false /* deleted */); + // Delete all blobs for (const auto& [id, blob] : blob_map) { int64_t shard_id = std::get< 1 >(id), blob_id = std::get< 2 >(id); @@ -65,6 +74,9 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { LOGINFO("delete blob shard {} blob {}", shard_id, blob_id); } + // Verify the stats after restart + verify_obj_count(num_pgs, num_blobs_per_shard * 2, num_shards_per_pg, true /* deleted */); + // Delete again should have no errors. for (const auto& [id, blob] : blob_map) { int64_t shard_id = std::get< 1 >(id), blob_id = std::get< 2 >(id); @@ -73,6 +85,8 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { LOGINFO("delete blob shard {} blob {}", shard_id, blob_id); } + verify_obj_count(num_pgs, num_blobs_per_shard * 2, num_shards_per_pg, true /* deleted */); + // After delete all blobs, get should fail for (const auto& [id, blob] : blob_map) { int64_t shard_id = std::get< 1 >(id), blob_id = std::get< 2 >(id); @@ -109,6 +123,9 @@ TEST_F(HomeObjectFixture, BasicPutGetDelBlobWRestart) { auto g = _obj_inst->blob_manager()->get(shard_id, blob_id).get(); ASSERT_TRUE(!g); } + + // Verify the stats after restart + verify_obj_count(num_pgs, num_blobs_per_shard * 2, num_shards_per_pg, true /* deleted */); } TEST_F(HomeObjectFixture, SealShardWithRestart) { diff --git a/src/lib/memory_backend/mem_blob_manager.cpp b/src/lib/memory_backend/mem_blob_manager.cpp index 7642bfe4..0988483d 100644 --- a/src/lib/memory_backend/mem_blob_manager.cpp +++ b/src/lib/memory_backend/mem_blob_manager.cpp @@ -24,7 +24,8 @@ BlobManager::AsyncResult< blob_id_t > MemoryHomeObject::_put_blob(ShardInfo cons auto lg = std::shared_lock(_pg_lock); auto iter = _pg_map.find(_shard.placement_group); RELEASE_ASSERT(iter != _pg_map.end(), "PG not found"); - new_blob_id = iter->second->blob_sequence_num_.fetch_add(1, std::memory_order_relaxed); + iter->second->durable_entities_update( + [&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); }); } WITH_ROUTE(new_blob_id);