Skip to content

Commit

Permalink
Add baseline resync read part in homeobject. (#204)
Browse files Browse the repository at this point in the history
Add read_snapshot_data to go over all shards and blobs
of a PG. If obj_id is zero, send all shards. obj_id
is concatenation of blob sequence number and batch number.
For all other values of obj_id, we send batch of blobs for a shard.
Once all blobs are finished in a shard, we move to next shard_id,
and batch_num is reset to 0. Add LSN in shard metadata to ignore
all reads of shards which are created later that the snapshot LSN.
  • Loading branch information
sanebay authored Sep 9, 2024
1 parent 1d4ef53 commit d87b8e4
Show file tree
Hide file tree
Showing 16 changed files with 457 additions and 23 deletions.
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.1.1"
version = "2.1.2"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
1 change: 1 addition & 0 deletions src/include/homeobject/shard_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ struct ShardInfo {
shard_id_t id;
pg_id_t placement_group;
State state;
uint64_t lsn;
uint64_t created_time;
uint64_t last_modified_time;
uint64_t available_capacity_bytes;
Expand Down
2 changes: 2 additions & 0 deletions src/lib/blob_route.hpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include <compare>
#include <functional>

Expand Down
1 change: 1 addition & 0 deletions src/lib/homestore_backend/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ target_sources("${PROJECT_NAME}_homestore" PRIVATE
hs_blob_manager.cpp
hs_shard_manager.cpp
hs_pg_manager.cpp
pg_blob_iterator.cpp
index_kv.cpp
heap_chunk_selector.cpp
replication_state_machine.cpp
Expand Down
6 changes: 6 additions & 0 deletions src/lib/homestore_backend/hs_backend_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ attribute "deprecated";
table HSBackendSettings {
// timer thread freq in us
backend_timer_us: uint64 = 60000000 (hotswap);

// Maximum number of blobs in a snapshot batch
max_num_blobs_in_snapshot_batch: uint64 = 1024 (hotswap);

// Maximum size of a snapshot batch
max_snapshot_batch_size_mb: uint64 = 128 (hotswap);
}

root_type HSBackendSettings;
23 changes: 13 additions & 10 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "lib/homeobject_impl.hpp"
#include "lib/blob_route.hpp"
#include <homestore/homestore.hpp>
#include <homestore/blkdata_service.hpp>

SISL_LOGGING_DECL(blobmgr)

Expand Down Expand Up @@ -162,9 +163,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
return req->result().deferValue([this, req, repl_dev](const auto& result) -> BlobManager::AsyncResult< blob_id_t > {
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) {
err.current_leader = repl_dev->get_leader_id();
}
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
return folly::makeUnexpected(err);
}
auto blob_info = result.value();
Expand Down Expand Up @@ -259,18 +258,24 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
return folly::makeUnexpected(r.error());
}

auto const blkid = r.value();
return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/);
}

BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< homestore::ReplDev >& repl_dev,
shard_id_t shard_id, blob_id_t blob_id,
uint64_t req_offset, uint64_t req_len,
const homestore::MultiBlkId& blkid) const {
auto const total_size = blkid.blk_count() * repl_dev->get_blk_size();
sisl::io_blob_safe read_buf{total_size, io_align};

sisl::sg_list sgs;
sgs.size = total_size;
sgs.iovs.emplace_back(iovec{.iov_base = read_buf.bytes(), .iov_len = read_buf.size()});

BLOGT(shard.id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes());
BLOGT(shard_id, blob_id, "Blob get request: blkid={}, buf={}", blkid.to_string(), (void*)read_buf.bytes());
return repl_dev->async_read(blkid, sgs, total_size)
.thenValue([this, blob_id, shard_id = shard.id, req_len, req_offset, blkid,
read_buf = std::move(read_buf), repl_dev](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
.thenValue([this, blob_id, shard_id, req_len, req_offset, blkid, repl_dev,
read_buf = std::move(read_buf)](auto&& result) mutable -> BlobManager::AsyncResult< Blob > {
if (result) {
BLOGE(shard_id, blob_id, "Failed to get blob: err={}", blob_id, shard_id, result.value());
return folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED));
Expand Down Expand Up @@ -379,9 +384,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
return req->result().deferValue([repl_dev](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
if (result.hasError()) {
auto err = result.error();
if (err.getCode() == BlobErrorCode::NOT_LEADER) {
err.current_leader = repl_dev->get_leader_id();
}
if (err.getCode() == BlobErrorCode::NOT_LEADER) { err.current_leader = repl_dev->get_leader_id(); }
return folly::makeUnexpected(err);
}
auto blob_info = result.value();
Expand Down
36 changes: 36 additions & 0 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
#include "heap_chunk_selector.h"
#include "lib/homeobject_impl.hpp"
#include "replication_message.hpp"
#include "homeobject/common.hpp"
#include "index_kv.hpp"

namespace homestore {
struct meta_blk;
Expand Down Expand Up @@ -267,6 +269,10 @@ class HSHomeObject : public HomeObjectImpl {
homestore::MultiBlkId pbas;
};

struct BlobInfoData : public BlobInfo {
Blob blob;
};

enum class BlobState : uint8_t {
ALIVE = 0,
TOMBSTONE = 1,
Expand All @@ -275,6 +281,26 @@ class HSHomeObject : public HomeObjectImpl {

inline const static homestore::MultiBlkId tombstone_pbas{0, 0, 0};

struct PGBlobIterator {
PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0);
PG* get_pg_metadata();
int64_t get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< HSHomeObject::BlobInfoData >& blob_vec, bool& end_of_shard);
void create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob);
void create_blobs_snapshot_data(std::vector< HSHomeObject::BlobInfoData >& blob_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard);
bool end_of_scan() const;

uint64_t cur_shard_seq_num_{1};
int64_t cur_blob_id_{-1};
uint64_t max_shard_seq_num_{0};
uint64_t cur_snapshot_batch_num{0};
HSHomeObject& home_obj_;
homestore::group_id_t group_id_;
pg_id_t pg_id_;
shared< homestore::ReplDev > repl_dev_;
};

private:
shared< HeapChunkSelector > chunk_selector_;
unique< HttpManager > http_mgr_;
Expand All @@ -286,6 +312,11 @@ class HSHomeObject : public HomeObjectImpl {
private:
static homestore::ReplicationService& hs_repl_service() { return homestore::hs()->repl_service(); }

// blob related
BlobManager::AsyncResult< Blob > _get_blob_data(const shared< homestore::ReplDev >& repl_dev, shard_id_t shard_id,
blob_id_t blob_id, uint64_t req_offset, uint64_t req_len,
const homestore::MultiBlkId& blkid) const;

// create pg related
PGManager::NullAsyncResult do_create_pg(cshared< homestore::ReplDev > repl_dev, PGInfo&& pg_info);
static std::string serialize_pg_info(const PGInfo& info);
Expand Down Expand Up @@ -414,6 +445,11 @@ class HSHomeObject : public HomeObjectImpl {
const BlobInfo& blob_info);
void print_btree_index(pg_id_t pg_id);

shared< BlobIndexTable > get_index_table(pg_id_t pg_id);

BlobManager::Result< std::vector< BlobInfo > >
query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id, uint64_t max_num_in_batch);

// Zero padding buffer related.
size_t max_pad_size() const;
sisl::io_blob_safe& get_pad_buf(uint32_t pad_len);
Expand Down
6 changes: 5 additions & 1 deletion src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ std::string HSHomeObject::serialize_shard_info(const ShardInfo& info) {
j["shard_info"]["shard_id_t"] = info.id;
j["shard_info"]["pg_id_t"] = info.placement_group;
j["shard_info"]["state"] = info.state;
j["shard_info"]["lsn"] = info.lsn;
j["shard_info"]["created_time"] = info.created_time;
j["shard_info"]["modified_time"] = info.last_modified_time;
j["shard_info"]["total_capacity"] = info.total_capacity_bytes;
Expand All @@ -81,6 +82,7 @@ ShardInfo HSHomeObject::deserialize_shard_info(const char* json_str, size_t str_
shard_info.id = shard_json["shard_info"]["shard_id_t"].get< shard_id_t >();
shard_info.placement_group = shard_json["shard_info"]["pg_id_t"].get< pg_id_t >();
shard_info.state = static_cast< ShardInfo::State >(shard_json["shard_info"]["state"].get< int >());
shard_info.lsn = shard_json["shard_info"]["lsn"].get< uint64_t >();
shard_info.created_time = shard_json["shard_info"]["created_time"].get< uint64_t >();
shard_info.last_modified_time = shard_json["shard_info"]["modified_time"].get< uint64_t >();
shard_info.available_capacity_bytes = shard_json["shard_info"]["available_capacity"].get< uint64_t >();
Expand Down Expand Up @@ -116,6 +118,7 @@ ShardManager::AsyncResult< ShardInfo > HSHomeObject::_create_shard(pg_id_t pg_ow
sb->info = ShardInfo{.id = new_shard_id,
.placement_group = pg_owner,
.state = ShardInfo::State::OPEN,
.lsn = 0,
.created_time = create_time,
.last_modified_time = create_time,
.available_capacity_bytes = size_bytes,
Expand Down Expand Up @@ -313,7 +316,8 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
switch (header->msg_type) {
case ReplicationMessageType::CREATE_SHARD_MSG: {
auto sb = r_cast< shard_info_superblk const* >(h.cbytes() + sizeof(ReplicationMessageHeader));
auto const shard_info = sb->info;
auto shard_info = sb->info;
shard_info.lsn = lsn;

bool shard_exist = false;
{
Expand Down
37 changes: 37 additions & 0 deletions src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,41 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) {
index_table->dump_tree_to_file();
}

shared< BlobIndexTable > HSHomeObject::get_index_table(pg_id_t pg_id) {
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
auto hs_pg = static_cast< HSHomeObject::HS_PG* >(iter->second.get());
RELEASE_ASSERT(hs_pg->index_table_ != nullptr, "Index table not found for PG");
return hs_pg->index_table_;
}

BlobManager::Result< std::vector< HSHomeObject::BlobInfo > >
HSHomeObject::query_blobs_in_shard(pg_id_t pg_id, uint64_t cur_shard_seq_num, blob_id_t start_blob_id,
uint64_t max_num_in_batch) {
// Query all blobs from start_blob_id to the maximum blob_id value.
std::vector< std::pair< BlobRouteKey, BlobRouteValue > > out_vector;
auto shard_id = make_new_shard_id(pg_id, cur_shard_seq_num);
auto start_key = BlobRouteKey{BlobRoute{shard_id, start_blob_id}};
auto end_key = BlobRouteKey{BlobRoute{shard_id, std::numeric_limits< uint64_t >::max()}};
homestore::BtreeQueryRequest< BlobRouteKey > query_req{
homestore::BtreeKeyRange< BlobRouteKey >{std::move(start_key), true /* inclusive */, std::move(end_key),
true /* inclusive */},
homestore::BtreeQueryType::SWEEP_NON_INTRUSIVE_PAGINATION_QUERY, static_cast< uint32_t >(max_num_in_batch)};
auto index_table = get_index_table(pg_id);
auto const ret = index_table->query(query_req, out_vector);
if (ret != homestore::btree_status_t::success && ret != homestore::btree_status_t::has_more) {
LOGE("Failed to query blobs in index table for ret={} shard={} start_blob_id={}", ret, shard_id, start_blob_id);
return folly::makeUnexpected(BlobErrorCode::INDEX_ERROR);
}

std::vector< BlobInfo > blob_info_vec;
blob_info_vec.reserve(out_vector.size());
for (auto& [r, v] : out_vector) {
blob_info_vec.push_back(BlobInfo{r.key().shard, r.key().blob, v.pbas()});
}

return blob_info_vec;
}

} // namespace homeobject
129 changes: 129 additions & 0 deletions src/lib/homestore_backend/pg_blob_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include "hs_homeobject.hpp"
#include <sisl/logging/logging.h>
#include <sisl/options/options.h>
#include <sisl/settings/settings.hpp>
#include "generated/resync_pg_shard_generated.h"
#include "generated/resync_blob_data_generated.h"

namespace homeobject {

HSHomeObject::PGBlobIterator::PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id,
uint64_t upto_lsn) :
home_obj_(home_obj), group_id_(group_id) {
auto pg = get_pg_metadata();
pg_id_ = pg->pg_info_.id;
repl_dev_ = static_cast< HS_PG* >(pg)->repl_dev_;
if (upto_lsn != 0) {
// Iterate all shards and its blob which have lsn <= upto_lsn
for (auto& shard : pg->shards_) {
auto sequence_num = home_obj_.get_sequence_num_from_shard_id(shard->info.id);
if (shard->info.lsn <= upto_lsn) { max_shard_seq_num_ = std::max(max_shard_seq_num_, sequence_num); }
}
} else {
max_shard_seq_num_ = pg->shard_sequence_num_;
}
}

PG* HSHomeObject::PGBlobIterator::get_pg_metadata() {
std::scoped_lock lock_guard(home_obj_._pg_lock);
auto iter = home_obj_._pg_map.begin();
for (; iter != home_obj_._pg_map.end(); iter++) {
if (iter->second->pg_info_.replica_set_uuid == group_id_) { break; }
}

RELEASE_ASSERT(iter != home_obj_._pg_map.end(), "PG not found replica_set_uuid={}",
boost::uuids::to_string(group_id_));
return iter->second.get();
}

void HSHomeObject::PGBlobIterator::create_pg_shard_snapshot_data(sisl::io_blob_safe& meta_blob) {
auto pg = get_pg_metadata();
auto& pg_info = pg->pg_info_;
auto& pg_shards = pg->shards_;

flatbuffers::FlatBufferBuilder builder;
std::vector< std::uint8_t > uuid(pg_info.replica_set_uuid.size());
std::copy(pg_info.replica_set_uuid.begin(), pg_info.replica_set_uuid.end(), uuid.begin());
auto pg_entry = CreatePGInfoEntry(builder, pg_info.id, 0 /* priority*/, builder.CreateVector(uuid));

std::vector< ::flatbuffers::Offset< ShardInfoEntry > > shard_entries;
for (auto& shard : pg_shards) {
auto& shard_info = shard->info;
// TODO add lsn.
shard_entries.push_back(CreateShardInfoEntry(
builder, static_cast< uint8_t >(shard_info.state), shard_info.placement_group, shard_info.id,
shard_info.total_capacity_bytes, shard_info.created_time, shard_info.last_modified_time));
}
builder.FinishSizePrefixed(CreateResyncPGShardInfo(builder, pg_entry, builder.CreateVector(shard_entries)));
meta_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(meta_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

int64_t HSHomeObject::PGBlobIterator::get_next_blobs(uint64_t max_num_blobs_in_batch, uint64_t max_batch_size_bytes,
std::vector< BlobInfoData >& blob_data_vec, bool& end_of_shard) {
end_of_shard = false;
uint64_t total_bytes = 0, num_blobs = 0;
while (true) {
auto r = home_obj_.query_blobs_in_shard(pg_id_, cur_shard_seq_num_, cur_blob_id_ + 1, max_num_blobs_in_batch);
if (!r) { return -1; }
auto& index_results_vec = r.value();
for (auto& info : index_results_vec) {
if (info.pbas == HSHomeObject::tombstone_pbas) {
// Skip deleted blobs
continue;
}
auto result = home_obj_
._get_blob_data(repl_dev_, info.shard_id, info.blob_id, 0 /*start_offset*/,
0 /* req_len */, info.pbas)
.get();
if (!result) {
LOGE("Failed to retrieve blob for shard={} blob={} pbas={}", info.shard_id, info.blob_id,
info.pbas.to_string(), result.error());
return -1;
}

auto& blob = result.value();
num_blobs++;
total_bytes += blob.body.size() + blob.user_key.size();
if (num_blobs > max_num_blobs_in_batch || total_bytes > max_batch_size_bytes) { return 0; }

BlobInfoData blob_data{{info.shard_id, info.blob_id, std::move(info.pbas)}, std::move(blob)};
blob_data_vec.push_back(std::move(blob_data));
cur_blob_id_ = info.blob_id;
}

if (index_results_vec.empty()) {
// We got empty results from index, which means we read
// all the blobs in the current shard
end_of_shard = true;
cur_shard_seq_num_++;
cur_blob_id_ = -1;
break;
}
}

return 0;
}

void HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(std::vector< BlobInfoData >& blob_data_vec,
sisl::io_blob_safe& data_blob, bool end_of_shard) {
std::vector< ::flatbuffers::Offset< BlobData > > blob_entries;
flatbuffers::FlatBufferBuilder builder;
for (auto& b : blob_data_vec) {
blob_entries.push_back(
CreateBlobData(builder, b.shard_id, b.blob_id, b.blob.user_key.size(), b.blob.body.size(),
builder.CreateVector(r_cast< uint8_t* >(const_cast< char* >(b.blob.user_key.data())),
b.blob.user_key.size()),
builder.CreateVector(b.blob.body.bytes(), b.blob.body.size())));
}
builder.FinishSizePrefixed(
CreateResyncBlobDataBatch(builder, builder.CreateVector(blob_entries), end_of_shard /* end_of_batch */));
data_blob = sisl::io_blob_safe{builder.GetSize()};
std::memcpy(data_blob.bytes(), builder.GetBufferPointer(), builder.GetSize());
}

bool HSHomeObject::PGBlobIterator::end_of_scan() const {
return max_shard_seq_num_ == 0 || cur_shard_seq_num_ > max_shard_seq_num_;
}

} // namespace homeobject
Loading

0 comments on commit d87b8e4

Please sign in to comment.