Skip to content

Commit

Permalink
Use homestore 6.x (#159)
Browse files Browse the repository at this point in the history
This PR provides features
* To use homestore 6.x which supports vetoing the shard create or blob put
when PG or shard is not ready respectively.

* HomeObject Metrics and also durable entities to ensure that blob_id, count
etc are all made durable and loaded after restart

* Start HttpServer and provide metrics, objlife counter, settings related endpoints

* Test doesn't retry put in case if leader is not elected after restart, fixed it
  • Loading branch information
hkadayam authored Apr 1, 2024
1 parent 0f1b2ff commit c6f7d23
Show file tree
Hide file tree
Showing 21 changed files with 525 additions and 322 deletions.
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def build_requirements(self):
self.build_requires("gtest/1.14.0")

def requirements(self):
self.requires("homestore/[~=5, include_prerelease=True]@oss/master")
self.requires("sisl/[~=12, include_prerelease=True]@oss/master")
self.requires("homestore/[~=6.0, include_prerelease=True]@oss/master")
self.requires("sisl/[~=12.1, include_prerelease=True]@oss/master")
self.requires("lz4/1.9.4", override=True)

def validate(self):
Expand Down
16 changes: 9 additions & 7 deletions src/include/homeobject/pg_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,15 @@ struct PGInfo {
struct PGStats {
pg_id_t id;
peer_id_t replica_set_uuid;
peer_id_t leader_id; // the leader of this PG from my perspective;
uint32_t num_members; // number of members in this PG;
uint32_t total_shards; // shards allocated on this PG (including open shards)
uint32_t open_shards; // active shards on this PG;
uint32_t avail_open_shards; // total number of shards that could be opened on this PG;
uint64_t used_bytes; // total number of bytes used by all shards on this PG;
uint64_t avail_bytes; // total number of bytes available on this PG;
peer_id_t leader_id; // the leader of this PG from my perspective;
uint32_t num_members; // number of members in this PG;
uint32_t total_shards; // shards allocated on this PG (including open shards)
uint32_t open_shards; // active shards on this PG;
uint32_t avail_open_shards; // total number of shards that could be opened on this PG;
uint64_t used_bytes; // total number of bytes used by all shards on this PG;
uint64_t avail_bytes; // total number of bytes available on this PG;
uint64_t num_active_objects; // total number of active objects on this PG;
uint64_t num_tombstone_objects; // total number of tombstone objects on this PG;
std::vector<
std::tuple< peer_id_t, std::string, uint64_t /* last_commit_lsn */, uint64_t /* last_succ_resp_us_ */ > >
members;
Expand Down
20 changes: 18 additions & 2 deletions src/lib/homeobject_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,28 @@ struct PG {
PG& operator=(PG&& pg) = default;
virtual ~PG() = default;

struct DurableEntities {
std::atomic< blob_id_t > blob_sequence_num{0ull};
std::atomic< uint64_t > active_blob_count{0ull};
std::atomic< uint64_t > tombstone_blob_count{0ull};
};

PGInfo pg_info_;
uint64_t shard_sequence_num_{0};
std::atomic< blob_id_t > blob_sequence_num_{0ull};
std::atomic< bool > is_dirty_{false};
ShardPtrList shards_;

void durable_entities_update(auto&& cb, bool dirty = true) {
cb(durable_entities_);
if (dirty) { is_dirty_.store(true, std::memory_order_relaxed); }
}

DurableEntities const& durable_entities() const { return durable_entities_; }

protected:
DurableEntities durable_entities_;
};
class HomeObjCPContext;

class HomeObjectImpl : public HomeObject,
public BlobManager,
public PGManager,
Expand Down
3 changes: 2 additions & 1 deletion src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
hs_hmobj_cp.cpp
hs_cp_callbacks.cpp
hs_http_manager.cpp
$<TARGET_OBJECTS:${PROJECT_NAME}_core>
)
target_link_libraries("${PROJECT_NAME}_homestore" PUBLIC
Expand Down
84 changes: 51 additions & 33 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include "replication_state_machine.hpp"
#include "lib/homeobject_impl.hpp"
#include "lib/blob_route.hpp"
#include "hs_hmobj_cp.hpp"
#include <homestore/homestore.hpp>

SISL_LOGGING_DECL(blobmgr)
Expand Down Expand Up @@ -89,15 +88,12 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HS_PG* >(iter->second.get());
repl_dev = hs_pg->repl_dev_;
new_blob_id = hs_pg->blob_sequence_num_.fetch_add(1, std::memory_order_relaxed);
hs_pg->durable_entities_update(
[&new_blob_id](auto& de) { new_blob_id = de.blob_sequence_num.fetch_add(1, std::memory_order_relaxed); },
false /* dirty */);

hs_pg->cache_pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_.load();
auto cur_cp = homestore::HomeStore::instance()->cp_mgr().cp_guard();
auto cp_ctx = s_cast< HomeObjCPContext* >(cur_cp->context(homestore::cp_consumer_t::HS_CLIENT));
cp_ctx->add_pg_to_dirty_list(hs_pg->cache_pg_sb_);

RELEASE_ASSERT(new_blob_id < std::numeric_limits< decltype(new_blob_id) >::max(),
"exhausted all available blob ids");
DEBUG_ASSERT_LT(new_blob_id, std::numeric_limits< decltype(new_blob_id) >::max(),
"exhausted all available blob ids");
}

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
Expand Down Expand Up @@ -188,36 +184,46 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis

auto const blob_id = *(reinterpret_cast< blob_id_t* >(const_cast< uint8_t* >(key.cbytes())));
shared< BlobIndexTable > index_table;
HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HS_PG* >(iter->second.get());
index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
if (hs_pg->blob_sequence_num_.load() <= blob_id) {
hs_pg->blob_sequence_num_.store(blob_id + 1);
hs_pg->cache_pg_sb_->blob_sequence_num = hs_pg->blob_sequence_num_.load();

auto cur_cp = homestore::HomeStore::instance()->cp_mgr().cp_guard();
auto cp_ctx = s_cast< HomeObjCPContext* >(cur_cp->context(homestore::cp_consumer_t::HS_CLIENT));
cp_ctx->add_pg_to_dirty_list(hs_pg->cache_pg_sb_);
}
hs_pg = static_cast< HS_PG* >(iter->second.get());
}

index_table = hs_pg->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.blob_id = blob_id;
blob_info.pbas = pbas;

// Write to index table with key {shard id, blob id } and value {pba}.
auto r = add_to_index_table(index_table, blob_info);
if (r.hasError()) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, r.error());
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(r.error())); }
return;
auto const [exist_already, status] = add_to_index_table(index_table, blob_info);
if (!exist_already) {
if (status != homestore::btree_status_t::success) {
LOGE("Failed to insert into index table for blob {} err {}", lsn, enum_name(status));
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::INDEX_ERROR)); }
return;
} else {
// The PG superblock (durable entities) will be persisted as part of HS_CLIENT Checkpoint, which is always
// done ahead of the Index Checkpoint. Hence if the index already has this entity, whatever durable counters
// updated as part of the update would have been persisted already in PG superblock. So if we were to
// increment now, it will be a duplicate increment, hence ignorning for cases where index already exist for
// this blob put.

// Update the durable counters. We need to update the blob_sequence_num here only for replay case, as the
// number is already updated in the put_blob call.
hs_pg->durable_entities_update([&blob_id](auto& de) {
auto existing_blob_id = de.blob_sequence_num.load();
while ((blob_id > existing_blob_id) &&
!de.blob_sequence_num.compare_exchange_weak(existing_blob_id, blob_id)) {}
de.active_blob_count.fetch_add(1, std::memory_order_relaxed);
});
}
}

if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}

Expand Down Expand Up @@ -304,8 +310,8 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
});
}

homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
homestore::ReplResult< homestore::blk_alloc_hints >
HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx && hs_ctx->is_proposer) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
Expand All @@ -315,12 +321,16 @@ homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob
if (msg_header->corrupted()) {
LOGE("replication message header is corrupted with crc error shard:{}", msg_header->shard_id);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); }
return {};
return folly::makeUnexpected(homestore::ReplServiceError::FAILED);
}

std::scoped_lock lock_guard(_shard_lock);
auto shard_iter = _shard_map.find(msg_header->shard_id);
RELEASE_ASSERT(shard_iter != _shard_map.end(), "Couldnt find shard id");
if (shard_iter == _shard_map.end()) {
LOGW("Received a blob_put on an unknown shard, underlying engine will retry this later");
return folly::makeUnexpected(homestore::ReplServiceError::RESULT_NOT_EXIST_YET);
}

auto hs_shard = d_cast< HS_Shard* >((*shard_iter->second).get());
BLOGD(msg_header->shard_id, "n/a", "Picked chunk_id={}", hs_shard->sb_->chunk_id);

Expand Down Expand Up @@ -382,12 +392,14 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis

shared< BlobIndexTable > index_table;
shared< homestore::ReplDev > repl_dev;
HSHomeObject::HS_PG* hs_pg{nullptr};
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
index_table = hs_pg->index_table_;
repl_dev = hs_pg->repl_dev_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
}
Expand All @@ -409,7 +421,13 @@ void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sis
}

auto& multiBlks = r.value();
if (multiBlks != tombstone_pbas) { repl_dev->async_free_blks(lsn, multiBlks); }
if (multiBlks != tombstone_pbas) {
repl_dev->async_free_blks(lsn, multiBlks);
hs_pg->durable_entities_update([](auto& de) {
de.active_blob_count.fetch_sub(1, std::memory_order_relaxed);
de.tombstone_blob_count.fetch_add(1, std::memory_order_relaxed);
});
}

if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}
Expand Down
58 changes: 58 additions & 0 deletions src/lib/homestore_backend/hs_cp_callbacks.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*********************************************************************************
* Modifications Copyright 2017-2019 eBay Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed
* under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
*********************************************************************************/
#include <vector>
#include <homestore/homestore.hpp>
#include "hs_homeobject.hpp"

using homestore::CP;
using homestore::CPCallbacks;
using homestore::CPContext;

namespace homeobject {

std::unique_ptr< CPContext > HSHomeObject::MyCPCallbacks::on_switchover_cp(CP* cur_cp, CP* new_cp) { return nullptr; }

// when cp_flush is called, it means that all the dirty candidates are already in the dirty list.
// new dirty candidates will arrive on next cp's context.
folly::Future< bool > HSHomeObject::MyCPCallbacks::cp_flush(CP* cp) {
std::vector< HSHomeObject::HS_PG* > dirty_pg_list;
dirty_pg_list.reserve(home_obj_._pg_map.size());
{
std::shared_lock lock_guard(home_obj_._pg_lock);
for (auto const& [id, pg] : home_obj_._pg_map) {
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(pg.get());

// All dirty durable entries are updated in the superblk. We persist outside the pg_lock
if (!hs_pg->is_dirty_.exchange(false)) { continue; }

hs_pg->pg_sb_->blob_sequence_num = hs_pg->durable_entities().blob_sequence_num.load();
hs_pg->pg_sb_->active_blob_count = hs_pg->durable_entities().active_blob_count.load();
hs_pg->pg_sb_->tombstone_blob_count = hs_pg->durable_entities().tombstone_blob_count.load();
dirty_pg_list.push_back(hs_pg);
}
}

for (auto& hs_pg : dirty_pg_list) {
hs_pg->pg_sb_.write();
}
return folly::makeFuture< bool >(true);
}

void HSHomeObject::MyCPCallbacks::cp_cleanup(CP* cp) {}

int HSHomeObject::MyCPCallbacks::cp_progress_percent() { return 0; }

} // namespace homeobject
84 changes: 0 additions & 84 deletions src/lib/homestore_backend/hs_hmobj_cp.cpp

This file was deleted.

Loading

0 comments on commit c6f7d23

Please sign in to comment.