Skip to content

Commit

Permalink
Implement blob del API
Browse files Browse the repository at this point in the history
in this PR, additional assistant api is also added , which will be used
by GC thread

Signed-off-by: Jie Yao <[email protected]>
  • Loading branch information
JacksonYao287 committed Oct 18, 2023
1 parent f16ed3f commit e9f0283
Show file tree
Hide file tree
Showing 8 changed files with 251 additions and 26 deletions.
6 changes: 3 additions & 3 deletions src/lib/blob_route.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ namespace homeobject {
// to appear in a different Index should the Blob (Shard) be moved between Pgs.
#pragma pack(1)
struct BlobRoute {
enum KeyType { kTypeTombStone = 0x0, kTypeNormal = 0x1 };
KeyType keyType;
shard_id_t shard;
blob_id_t blob;
auto operator<=>(BlobRoute const&) const = default;
sisl::blob to_blob() const {
return sisl::blob{uintptr_cast(const_cast< BlobRoute* >(this)), sizeof(*this)};
}
sisl::blob to_blob() const { return sisl::blob{uintptr_cast(const_cast< BlobRoute* >(this)), sizeof(*this)}; }
};
#pragma pack()

Expand Down
77 changes: 74 additions & 3 deletions src/lib/homestore_backend/hs_blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
RELEASE_ASSERT(index_table != nullptr, "Index table instance null");

auto r = get_from_index_table(index_table, shard.id, blob_id);
auto r = get_normal_blkIDs_from_index_table(index_table, shard.id, blob_id);
if (!r) {
LOGWARN("Blob not found in index id {} shard {}", blob_id, shard.id);
return folly::makeUnexpected(r.error());
Expand Down Expand Up @@ -281,8 +281,79 @@ homestore::blk_alloc_hints HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob
return hints;
}

BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const&, blob_id_t) {
return folly::makeUnexpected(BlobError::UNKNOWN);
BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
auto& pg_id = shard.placement_group;
shared< homestore::ReplDev > repl_dev;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
repl_dev = static_cast< HS_PG* >(iter->second.get())->repl_dev_;
}

RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");

auto req = repl_result_ctx< BlobManager::Result< BlobInfo > >::make(sizeof(blob_id_t), io_align);

req->header_.msg_type = ReplicationMessageType::DEL_BLOB_MSG;
req->header_.payload_size = 0;
req->header_.payload_crc = 0;
req->header_.shard_id = shard.id;
req->header_.pg_id = pg_id;
req->header_.seal();
sisl::blob header;
header.bytes = r_cast< uint8_t* >(&req->header_);
header.size = sizeof(req->header_);

memcpy(req->hdr_buf_.bytes, &blob_id, sizeof(blob_id_t));

repl_dev->async_alloc_write(header, req->hdr_buf_, sisl::sg_list{}, req);
return req->result().deferValue([](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
if (result.hasError()) { return folly::makeUnexpected(result.error()); }
auto blob_info = result.value();
LOGTRACEMOD(blobmgr, "Delete blob success, shard_id {} , blob_id {}", blob_info.shard_id, blob_info.blob_id);

return folly::Unit();
});
}

void HSHomeObject::on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx) {
repl_result_ctx< BlobManager::Result< BlobInfo > >* ctx{nullptr};
if (hs_ctx != nullptr) {
ctx = boost::static_pointer_cast< repl_result_ctx< BlobManager::Result< BlobInfo > > >(hs_ctx).get();
}

auto msg_header = r_cast< ReplicationMessageHeader* >(header.bytes);
if (msg_header->corrupted()) {
LOGERROR("replication message header is corrupted with crc error, lsn:{}", lsn);
if (ctx) { ctx->promise_.setValue(folly::makeUnexpected(BlobError::CHECKSUM_MISMATCH)); }
return;
}

shared< BlobIndexTable > index_table;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(msg_header->pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
}

BlobInfo blob_info;
blob_info.shard_id = msg_header->shard_id;
blob_info.blob_id = *r_cast< blob_id_t* >(key.bytes);

// set a tombstone for the blob in index table.
// it is the GC thread's responsibility to delete the tombstone from index table and reclaim the space.
auto r = move_to_tombstone(index_table, blob_info);
if (r.hasError()) {
LOGERROR("Failed to delete blob from index table for blob {} err {}", blob_info.blob_id, r.error());
ctx->promise_.setValue(folly::makeUnexpected(r.error()));
return;
}

if (ctx) { ctx->promise_.setValue(BlobManager::Result< BlobInfo >(blob_info)); }
}

void HSHomeObject::compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes,
Expand Down
35 changes: 30 additions & 5 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ class HSHomeObject : public HomeObjectImpl {

bool valid() const { return magic == blob_header_magic || version <= blob_header_version; }
std::string to_string() {
return fmt::format("magic={:#x} version={} algo={} hash={} shard={} blob_size={} user_size={}", magic, version,
(uint8_t)hash_algorithm, hex_bytes(hash, blob_max_hash_len), shard_id, blob_size,
user_key_size);
return fmt::format("magic={:#x} version={} algo={} hash={} shard={} blob_size={} user_size={}", magic,
version, (uint8_t)hash_algorithm, hex_bytes(hash, blob_max_hash_len), shard_id,
blob_size, user_key_size);
}
};
#pragma pack()
Expand Down Expand Up @@ -162,6 +162,18 @@ class HSHomeObject : public HomeObjectImpl {
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_shard_meta_blk_recover_completed(bool success);

BlobManager::Result< homestore::MultiBlkId >
get_blkIDs_from_index_table_internal(shared< BlobIndexTable > index_table, shard_id_t shard_id, blob_id_t blob_id,
BlobRoute::KeyType keyType) const;

BlobManager::Result< homestore::MultiBlkId >
get_normal_blkIDs_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id,
blob_id_t blob_id) const;

BlobManager::Result< homestore::MultiBlkId >
get_tombstone_blkIDs_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id,
blob_id_t blob_id) const;

public:
using HomeObjectImpl::HomeObjectImpl;
~HSHomeObject();
Expand All @@ -187,6 +199,10 @@ class HSHomeObject : public HomeObjectImpl {
// Blob manager related.
void on_blob_put_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);

void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
cintrusive< homestore::repl_req_ctx >& hs_ctx);

homestore::blk_alloc_hints blob_put_get_blk_alloc_hints(sisl::blob const& header,
cintrusive< homestore::repl_req_ctx >& ctx);
void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size,
Expand All @@ -196,8 +212,17 @@ class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< BlobIndexTable > create_index_table();
std::shared_ptr< BlobIndexTable > recover_index_table(const homestore::superblk< homestore::index_table_sb >& sb);
BlobManager::NullResult add_to_index_table(shared< BlobIndexTable > index_table, const BlobInfo& blob_info);
BlobManager::Result< homestore::MultiBlkId > get_from_index_table(shared< BlobIndexTable > index_table,
shard_id_t shard_id, blob_id_t blob_id) const;
// This is used by GC thread to delete the tombstone from index table
BlobManager::NullResult del_tombstone_from_index_table(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info);

// it is used by test for now, could be used by GC Thread later
BlobManager::Result< homestore::MultiBlkId > get_normal_blkIDs(pg_id_t pg_id, shard_id_t shard_id,
blob_id_t blob_id) const;
// it is used by test for now, could be used by GC Thread later
BlobManager::Result< homestore::MultiBlkId > get_tombstone_blkIDs(pg_id_t pg_id, shard_id_t shard_id,
blob_id_t blob_id) const;
BlobManager::NullResult move_to_tombstone(shared< BlobIndexTable > index_table, const BlobInfo& blob_info);

void print_btree_index(pg_id_t pg_id);
};
Expand Down
97 changes: 93 additions & 4 deletions src/lib/homestore_backend/index_kv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ HSHomeObject::recover_index_table(const homestore::superblk< homestore::index_ta

BlobManager::NullResult HSHomeObject::add_to_index_table(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info) {
BlobRouteKey index_key{BlobRoute{blob_info.shard_id, blob_info.blob_id}};
BlobRouteKey index_key{BlobRoute{BlobRoute::KeyType::kTypeNormal, blob_info.shard_id, blob_info.blob_id}};
BlobRouteValue index_value{blob_info.pbas};
homestore::BtreeSinglePutRequest put_req{&index_key, &index_value,
homestore::btree_put_type::INSERT_ONLY_IF_NOT_EXISTS};
Expand All @@ -59,9 +59,98 @@ BlobManager::NullResult HSHomeObject::add_to_index_table(shared< BlobIndexTable
return folly::Unit();
}

BlobManager::NullResult HSHomeObject::move_to_tombstone(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info) {
// since there may be a crash at any step of this function, we need to make sure the index table is always
// correct when we recover and reapply the raft log.
// the sequence here must be get, put and remove, so that we can keep the correctness of the index table.

BlobRouteKey index_key{BlobRoute{BlobRoute::KeyType::kTypeNormal, blob_info.shard_id, blob_info.blob_id}};
BlobRouteValue index_value;
homestore::BtreeSingleGetRequest get_req{&index_key, &index_value};
auto status = index_table->get(get_req);
if (status != homestore::btree_status_t::success) {
LOGERROR("Failed to get from index table {}", index_key.to_string());
return folly::makeUnexpected(BlobError::UNKNOWN_BLOB);
}

index_key.change_to_tombstone();
// we set a tombstone in key, of which GC can take the benifit to quickly finding the delete blkIDs by using
// the range query of key
homestore::BtreeSinglePutRequest put_req{&index_key, &index_value,
homestore::btree_put_type::INSERT_ONLY_IF_NOT_EXISTS};
status = index_table->put(put_req);
if (status != homestore::btree_status_t::success) {
LOGERROR("a tombstone key already exists when trying to move {} to tombstone", index_key.to_string());
return folly::makeUnexpected(BlobError::INDEX_ERROR);
}

index_key.change_to_normal();
homestore::BtreeSingleRemoveRequest remove_req{&index_key, &index_value};
status = index_table->remove(remove_req);
if (status != homestore::btree_status_t::success) {
LOGERROR("Failed to get from index table {}", index_key.to_string());
return folly::makeUnexpected(BlobError::INDEX_ERROR);
}

return folly::Unit();
}

// This function is used by GC thread to delete the tombstone from index table
BlobManager::NullResult HSHomeObject::del_tombstone_from_index_table(shared< BlobIndexTable > index_table,
const BlobInfo& blob_info) {
BlobRouteKey index_key{BlobRoute{BlobRoute::KeyType::kTypeTombStone, blob_info.shard_id, blob_info.blob_id}};
BlobRouteValue index_value;
homestore::BtreeSingleRemoveRequest remove_req{&index_key, &index_value};
auto status = index_table->remove(remove_req);
if (status != homestore::btree_status_t::success) {
LOGERROR("Failed to get from index table {}", index_key.to_string());
return folly::makeUnexpected(BlobError::INDEX_ERROR);
}

return folly::Unit();
}

BlobManager::Result< homestore::MultiBlkId >
HSHomeObject::get_normal_blkIDs_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id,
blob_id_t blob_id) const {
return get_blkIDs_from_index_table_internal(index_table, shard_id, blob_id, BlobRoute::KeyType::kTypeNormal);
}

BlobManager::Result< homestore::MultiBlkId >
HSHomeObject::get_tombstone_blkIDs_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id,
blob_id_t blob_id) const {
return get_blkIDs_from_index_table_internal(index_table, shard_id, blob_id, BlobRoute::KeyType::kTypeTombStone);
}

BlobManager::Result< homestore::MultiBlkId > HSHomeObject::get_normal_blkIDs(pg_id_t pg_id, shard_id_t shard_id,
blob_id_t blob_id) const {
shared< BlobIndexTable > index_table;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
}
return get_normal_blkIDs_from_index_table(index_table, shard_id, blob_id);
}

BlobManager::Result< homestore::MultiBlkId > HSHomeObject::get_tombstone_blkIDs(pg_id_t pg_id, shard_id_t shard_id,
blob_id_t blob_id) const {
shared< BlobIndexTable > index_table;
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT(iter != _pg_map.end(), "PG not found");
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
}
return get_tombstone_blkIDs_from_index_table(index_table, shard_id, blob_id);
}

BlobManager::Result< homestore::MultiBlkId >
HSHomeObject::get_from_index_table(shared< BlobIndexTable > index_table, shard_id_t shard_id, blob_id_t blob_id) const {
BlobRouteKey index_key{BlobRoute{shard_id, blob_id}};
HSHomeObject::get_blkIDs_from_index_table_internal(shared< BlobIndexTable > index_table, shard_id_t shard_id,
blob_id_t blob_id, BlobRoute::KeyType keyType) const {
BlobRouteKey index_key{BlobRoute{keyType, shard_id, blob_id}};
BlobRouteValue index_value;
homestore::BtreeSingleGetRequest get_req{&index_key, &index_value};
auto status = index_table->get(get_req);
Expand All @@ -78,7 +167,7 @@ void HSHomeObject::print_btree_index(pg_id_t pg_id) {
{
std::shared_lock lock_guard(_pg_lock);
auto iter = _pg_map.find(pg_id);
RELEASE_ASSERT (iter != _pg_map.end(), "Unknown PG");
RELEASE_ASSERT(iter != _pg_map.end(), "Unknown PG");
index_table = static_cast< HS_PG* >(iter->second.get())->index_table_;
RELEASE_ASSERT(index_table != nullptr, "Index table not intialized");
}
Expand Down
8 changes: 4 additions & 4 deletions src/lib/homestore_backend/index_kv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ class BlobRouteKey : public homestore::BtreeKey {
BlobRoute key_;

public:

BlobRouteKey() = default;
BlobRouteKey(const BlobRoute key) : key_(key) {}
BlobRouteKey(const BlobRouteKey& other) : BlobRouteKey(other.serialize(), true) {}
Expand Down Expand Up @@ -54,6 +53,9 @@ class BlobRouteKey : public homestore::BtreeKey {
return os;
}

void change_to_tombstone() { key_.keyType = BlobRoute::KeyType::kTypeTombStone; }
void change_to_normal() { key_.keyType = BlobRoute::KeyType::kTypeNormal; }

BlobRoute key() const { return key_; }
};

Expand All @@ -76,9 +78,7 @@ class BlobRouteValue : public homestore::BtreeValue {
}

uint32_t serialized_size() const override { return pbas_.serialized_size(); }
static uint32_t get_fixed_size() {
return homestore::MultiBlkId::expected_serialized_size(1 /* num_pieces */);
}
static uint32_t get_fixed_size() { return homestore::MultiBlkId::expected_serialized_size(1 /* num_pieces */); }

void deserialize(const sisl::blob& b, bool copy) override { pbas_.deserialize(b, copy); }
std::string to_string() const override { return fmt::format("{}", pbas_.to_string()); }
Expand Down
2 changes: 1 addition & 1 deletion src/lib/homestore_backend/replication_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ void ReplicationStateMachine::on_commit(int64_t lsn, const sisl::blob& header, c
break;
}
case ReplicationMessageType::DEL_BLOB_MSG:
home_object_->on_blob_del_commit(lsn, header, key, ctx);
break;
default: {
break;
}
}
}


bool ReplicationStateMachine::on_pre_commit(int64_t lsn, sisl::blob const&, sisl::blob const&,
cintrusive< homestore::repl_req_ctx >&) {
LOGI("on_pre_commit with lsn:{}", lsn);
Expand Down
Loading

0 comments on commit e9f0283

Please sign in to comment.